blob: cf3afcc574e4b130fd08d4621cd0b63579375a7e [file] [log] [blame]
<!DOCTYPE html>
<html lang="en"><head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1"><!-- Begin Jekyll SEO tag v2.8.0 -->
<title>Streaming data into Apache HBase using Apache Flume | Blogs Archive</title>
<meta name="generator" content="Jekyll v3.9.3" />
<meta property="og:title" content="Streaming data into Apache HBase using Apache Flume" />
<meta property="og:locale" content="en_US" />
<meta name="description" content="Apache Flume was conceived as a fault-tolerant ingest system for the Apache Hadoop ecosystem. Flume comes packaged with an HDFS Sink which can be used to write events into HDFS, and two different implementations of HBase sinks to write events into HBase. You can read about the basic architecture of Apache Flume 1.x in this blog post. You can also read about how Flume&rsquo;s File Channel persists events and still provides extremely high performance in an earlier blog post. In this article, we will explore how to configure Flume to write events into HBase, and write custom serializers to write events into HBase in a format of the user&rsquo;s choice. Data is stored in HBase as tables. Each table has one or more column families, and each column family has one or more columns. HBase stores all columns in a column family in physical proximity. Each row is identified by a key known as the row key. To insert data into HBase, the table name, column family, column name and row key have to be specified. More details on the HBase data model can be found in the HBase documentation.Flume has two HBase Sinks, the HBaseSink(org.apache.flume.sink.hbase.HBaseSink) and AsyncHBaseSink(org.apache.flume.sink.hbase.AsyncHBaseSink). These two sinks will eventually converge to similar functionality, but currently each has some advantages over the other: &lt;/p&gt; The AsyncHBaseSink currently gives better performance than the HBaseSink, primarily because it makes non-blocking calls to HBase. The HBaseSink will soon support secure HBase clusters (FLUME-1626) and the new HBase IPC which was introduced in HBase 0.96. The configuration for both these sinks are very similar. A sample configuration is shown below: #Use the AsyncHBaseSink host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink #Use the HBaseSink #host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink host1.sinks.sink1.channel = ch1 host1.sinks.sink1.table = transactions host1.sinks.sink1.columnFamily = clients host1.sinks.sink1.column = charges host1.sinks.sink1.batchSize = 5000 #Use the SimpleAsyncHbaseEventSerializer that comes with Flume host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer #Use the SimpleHbaseEventSerializer that comes with Flume #host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer host1.sinks.sink1.serializer.incrementColumn = icol host1.channels.ch1.type=memory In the above config, the &ldquo;table&rdquo; parameter specifies the table in HBase that the sink has to write to - in this case, &ldquo;transactions&rdquo;; the &ldquo;columnFamily&rdquo; parameter specifies the column family in that table to insert the data into, in this case, &ldquo;clients&rdquo;; and the &ldquo;column&rdquo; parameter specifies the column in the column family to write to, in this case &ldquo;charges&rdquo;. Apart from this the sink requires the channel to be configured, like all other Flume Sinks. The other interesting configuration parameters are the &ldquo;serializer&rdquo; and the &ldquo;serializer.*&rdquo; parameters. The two sinks use different interfaces for the serializer. In both cases, the serializer is a class that converts the Flume Event into an HBase-friendly format. This piece of code that &ldquo;translates&rdquo; the events is usually specific to the schema used by the user&rsquo;s HBase cluster and is usually implemented by the user. All configuration parameters passed in as &ldquo;serializer.*&rdquo; are passed to the serializer. This configuration can be used to set up any internal state the serializer needs.In case of the HBaseSink, the serializer converts a Flume Event into one or more HBase Puts and/or Increments. The serializer must implement the HbaseEventSerializer. The serializer is instantiated when the sink is started by the Flume configuration framework. For each event processed by the sink, the sink calls the initialize method in the serializer. The serializer must &ldquo;translate&rdquo; the Flume Event into HBase puts and increments which should be returned by getActions and getIncrements methods. &nbsp;These puts and increments are then sent over the wire to the HBase cluster. When the sink stops, this instance of the serializer is closed by the HBaseSink.The AsyncHBaseSink&rsquo;s serializer must implement AsyncHbaseEventSerializer. &lt;/p&gt; In this case, the initialize method is called once by the sink, when it starts up. For every event, the sink calls the setEvent method and then calls the getActions and getIncrements methods - similar to the HBaseSink. When the sink is stopped, the serializer&rsquo;s cleanUp method is called. Notice that the methods do not return the standard HBase Puts and Increments, but PutRequest and AtomicIncrementRequest from the asynchbase API. These are roughly equivalent to the HBase Puts and Increments respectively, with some differences. &lt;/p&gt; An example of such a serializer is below. &lt;/strong&gt; &lt;/font&gt; /** * A serializer for the AsyncHBaseSink, which splits the event body into * multiple columns and inserts them into a row whose key is available in * the headers */ public class SplittingSerializer implements AsyncHbaseEventSerializer { &nbsp;private byte[] table; &nbsp;private byte[] colFam; &nbsp;private Event currentEvent; &nbsp;private byte[][] columnNames; &nbsp;private final List puts = new ArrayList(); &lt;/span&gt; &nbsp;private final List incs = new ArrayList(); &lt;/span&gt; &nbsp;private byte[] currentRowKey; &nbsp;private final byte[] eventCountCol = &quot;eventCount&quot;.getBytes(); &nbsp;@Override &nbsp;public void initialize(byte[] table, byte[] cf) { &nbsp;&nbsp;&nbsp;this.table = table; &nbsp;&nbsp;&nbsp;this.colFam = cf; &nbsp;} &nbsp;@Override &nbsp;public void setEvent(Event event) { &nbsp;&nbsp;&nbsp;// Set the event and verify that the rowKey is not present &nbsp;&nbsp;&nbsp;this.currentEvent = event; &nbsp;&nbsp;&nbsp;String rowKeyStr = currentEvent.getHeaders().get(&quot;rowKey&quot;); &nbsp;&nbsp;&nbsp;if (rowKeyStr == null) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw new FlumeException(&quot;No row key found in headers!&quot;); &nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;currentRowKey = rowKeyStr.getBytes(); &nbsp;} &nbsp;@Override &nbsp;public List getActions() { &lt;/span&gt; &nbsp;&nbsp;&nbsp;// Split the event body and get the values for the columns &nbsp;&nbsp;&nbsp;String eventStr = new String(currentEvent.getBody()); &nbsp;&nbsp;&nbsp;String[] cols = eventStr.split(&quot;,&quot;); &nbsp;&nbsp;&nbsp;puts.clear(); &nbsp;&nbsp;&nbsp;for (int i = 0; i &lt; cols.length; i++) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//Generate a PutRequest for each column. &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;PutRequest req = new PutRequest(table, currentRowKey, colFam, &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;columnNames[i], cols[i].getBytes()); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;puts.add(req); &nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;return puts; &nbsp;} &nbsp;@Override &nbsp;public List getIncrements() { &lt;/span&gt; &nbsp;&nbsp;&nbsp;incs.clear(); &nbsp;&nbsp;&nbsp;//Increment the number of events received &nbsp;&nbsp;&nbsp;incs.add(new AtomicIncrementRequest(table, &quot;totalEvents&quot;.getBytes(), colFam, eventCountCol)); &nbsp;&nbsp;&nbsp;return incs; &nbsp;} &nbsp;@Override &nbsp;public void cleanUp() { &nbsp;&nbsp;&nbsp;table = null; &nbsp;&nbsp;&nbsp;colFam = null; &nbsp;&nbsp;&nbsp;currentEvent = null; &nbsp;&nbsp;&nbsp;columnNames = null; &nbsp;&nbsp;&nbsp;currentRowKey = null; &nbsp;} &nbsp;@Override &nbsp;public void configure(Context context) { &nbsp;&nbsp;&nbsp;//Get the column names from the configuration &nbsp;&nbsp;&nbsp;String cols = new String(context.getString(&quot;columns&quot;)); &nbsp;&nbsp;&nbsp;String[] names = cols.split(&quot;,&quot;); &nbsp;&nbsp;&nbsp;byte[][] columnNames = new byte[names.length][]; &nbsp;&nbsp;&nbsp;int i = 0; &nbsp;&nbsp;&nbsp;for(String name : names) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;columnNames[i++] = name.getBytes(); &nbsp;&nbsp;&nbsp;} &nbsp;} &nbsp;@Override &nbsp;public void configure(ComponentConfiguration conf) { &nbsp;} }&lt;/font&gt;&lt;/pre&gt; &lt;/p&gt; This serializer splits the event body based on a delimiter and inserts each split into a different column. The row is defined in the event header. When each event is received, a counter is incremented to keep track of the number of events received as well. This serializer can be configured by the following configuration: &lt;/strong&gt; &lt;/strong&gt; host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink host1.sinks.sink1.channel = ch1 host1.sinks.sink1.table = transactions host1.sinks.sink1.columnFamily = clients host1.sinks.sink1.batchSize = 5000 #The serializer to use host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SplittingSerializer #List of columns each event writes to. host1.sinks.sink1.serializer.columns = charges,date,priority Internals of the HBaseSink and AsyncHBaseSinkThe HBaseSink uses the HBase HTable API to write events out to HBase. HTable supports batching of Puts, but only HBase 0.92+ supports batching of Increments. Currently, the HBase Sink is single-threaded and will call the serializer to get the Puts and Increments once per event it processes. HBase Put and Increments are sent to HBase via blocking calls, which means the next event is read and passed to the serializer only once the current event is successfully written to HBase. Each transaction consists of at most the number of events specified by the batchSize property in the configuration. Like all other Flume sinks, if one of these events fails to get written successfully, the sink will retry the entire transaction again.On the other hand, the AsyncHBaseSink uses the asynchbase API, and sends out events asynchronously to HBase. The AsyncHBaseSink, in the same way as the HBase sink, generates Puts and Increments for each event. Once the Puts and Increments are generated, the sink sends them out immediately to HBase and moves on to process the next event. Success or failure is handled through callbacks. Again, each transaction consists of at most the number of events specified by the batchSize configuration parameter. The sink waits until either success callbacks are received for all the events sent, or at least one error callback is received. If an error callback is received, the entire transaction is retried, in true Flume style. A word of cautionAs you can see, if HBase reports failure to write even one Put or Increment, the entire transaction is retried - this is how Flume&rsquo;s at-least-once semantics work, and most Flume sinks operate in the same way. In case of HBase Increments, this means it is possible that the same event would cause a counter to be incremented more than once. This is something to keep in mind while using Flume to perform Increments. Also, if the serializer is not idempotent, then this means that it is possible that the same event can cause multiple different Puts to be written to HBase. Imagine a case where we are talking about credit card transactions represented by the event. If the same event can generate different Puts each time, it is possible that HBase would have multiple records of the same transactions, which is probably not desired.The AsyncHBaseSink is known to give better performance than the HBaseSink primarily because of the non-blocking nature of the underlying API it uses. The HBase community is working on improving the HBase client API to improve its performance, which would vastly improve the HBaseSink performance.ConclusionFlume is an excellent tool to write events out to the different storage systems in the Hadoop ecosystem including HBase. The HBase sinks provide the functionality to write data to HBase in your own schema and allows the user to &ldquo;map&rdquo; the Flume event to HBase data." />
<meta property="og:description" content="Apache Flume was conceived as a fault-tolerant ingest system for the Apache Hadoop ecosystem. Flume comes packaged with an HDFS Sink which can be used to write events into HDFS, and two different implementations of HBase sinks to write events into HBase. You can read about the basic architecture of Apache Flume 1.x in this blog post. You can also read about how Flume&rsquo;s File Channel persists events and still provides extremely high performance in an earlier blog post. In this article, we will explore how to configure Flume to write events into HBase, and write custom serializers to write events into HBase in a format of the user&rsquo;s choice. Data is stored in HBase as tables. Each table has one or more column families, and each column family has one or more columns. HBase stores all columns in a column family in physical proximity. Each row is identified by a key known as the row key. To insert data into HBase, the table name, column family, column name and row key have to be specified. More details on the HBase data model can be found in the HBase documentation.Flume has two HBase Sinks, the HBaseSink(org.apache.flume.sink.hbase.HBaseSink) and AsyncHBaseSink(org.apache.flume.sink.hbase.AsyncHBaseSink). These two sinks will eventually converge to similar functionality, but currently each has some advantages over the other: &lt;/p&gt; The AsyncHBaseSink currently gives better performance than the HBaseSink, primarily because it makes non-blocking calls to HBase. The HBaseSink will soon support secure HBase clusters (FLUME-1626) and the new HBase IPC which was introduced in HBase 0.96. The configuration for both these sinks are very similar. A sample configuration is shown below: #Use the AsyncHBaseSink host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink #Use the HBaseSink #host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink host1.sinks.sink1.channel = ch1 host1.sinks.sink1.table = transactions host1.sinks.sink1.columnFamily = clients host1.sinks.sink1.column = charges host1.sinks.sink1.batchSize = 5000 #Use the SimpleAsyncHbaseEventSerializer that comes with Flume host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer #Use the SimpleHbaseEventSerializer that comes with Flume #host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer host1.sinks.sink1.serializer.incrementColumn = icol host1.channels.ch1.type=memory In the above config, the &ldquo;table&rdquo; parameter specifies the table in HBase that the sink has to write to - in this case, &ldquo;transactions&rdquo;; the &ldquo;columnFamily&rdquo; parameter specifies the column family in that table to insert the data into, in this case, &ldquo;clients&rdquo;; and the &ldquo;column&rdquo; parameter specifies the column in the column family to write to, in this case &ldquo;charges&rdquo;. Apart from this the sink requires the channel to be configured, like all other Flume Sinks. The other interesting configuration parameters are the &ldquo;serializer&rdquo; and the &ldquo;serializer.*&rdquo; parameters. The two sinks use different interfaces for the serializer. In both cases, the serializer is a class that converts the Flume Event into an HBase-friendly format. This piece of code that &ldquo;translates&rdquo; the events is usually specific to the schema used by the user&rsquo;s HBase cluster and is usually implemented by the user. All configuration parameters passed in as &ldquo;serializer.*&rdquo; are passed to the serializer. This configuration can be used to set up any internal state the serializer needs.In case of the HBaseSink, the serializer converts a Flume Event into one or more HBase Puts and/or Increments. The serializer must implement the HbaseEventSerializer. The serializer is instantiated when the sink is started by the Flume configuration framework. For each event processed by the sink, the sink calls the initialize method in the serializer. The serializer must &ldquo;translate&rdquo; the Flume Event into HBase puts and increments which should be returned by getActions and getIncrements methods. &nbsp;These puts and increments are then sent over the wire to the HBase cluster. When the sink stops, this instance of the serializer is closed by the HBaseSink.The AsyncHBaseSink&rsquo;s serializer must implement AsyncHbaseEventSerializer. &lt;/p&gt; In this case, the initialize method is called once by the sink, when it starts up. For every event, the sink calls the setEvent method and then calls the getActions and getIncrements methods - similar to the HBaseSink. When the sink is stopped, the serializer&rsquo;s cleanUp method is called. Notice that the methods do not return the standard HBase Puts and Increments, but PutRequest and AtomicIncrementRequest from the asynchbase API. These are roughly equivalent to the HBase Puts and Increments respectively, with some differences. &lt;/p&gt; An example of such a serializer is below. &lt;/strong&gt; &lt;/font&gt; /** * A serializer for the AsyncHBaseSink, which splits the event body into * multiple columns and inserts them into a row whose key is available in * the headers */ public class SplittingSerializer implements AsyncHbaseEventSerializer { &nbsp;private byte[] table; &nbsp;private byte[] colFam; &nbsp;private Event currentEvent; &nbsp;private byte[][] columnNames; &nbsp;private final List puts = new ArrayList(); &lt;/span&gt; &nbsp;private final List incs = new ArrayList(); &lt;/span&gt; &nbsp;private byte[] currentRowKey; &nbsp;private final byte[] eventCountCol = &quot;eventCount&quot;.getBytes(); &nbsp;@Override &nbsp;public void initialize(byte[] table, byte[] cf) { &nbsp;&nbsp;&nbsp;this.table = table; &nbsp;&nbsp;&nbsp;this.colFam = cf; &nbsp;} &nbsp;@Override &nbsp;public void setEvent(Event event) { &nbsp;&nbsp;&nbsp;// Set the event and verify that the rowKey is not present &nbsp;&nbsp;&nbsp;this.currentEvent = event; &nbsp;&nbsp;&nbsp;String rowKeyStr = currentEvent.getHeaders().get(&quot;rowKey&quot;); &nbsp;&nbsp;&nbsp;if (rowKeyStr == null) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw new FlumeException(&quot;No row key found in headers!&quot;); &nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;currentRowKey = rowKeyStr.getBytes(); &nbsp;} &nbsp;@Override &nbsp;public List getActions() { &lt;/span&gt; &nbsp;&nbsp;&nbsp;// Split the event body and get the values for the columns &nbsp;&nbsp;&nbsp;String eventStr = new String(currentEvent.getBody()); &nbsp;&nbsp;&nbsp;String[] cols = eventStr.split(&quot;,&quot;); &nbsp;&nbsp;&nbsp;puts.clear(); &nbsp;&nbsp;&nbsp;for (int i = 0; i &lt; cols.length; i++) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//Generate a PutRequest for each column. &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;PutRequest req = new PutRequest(table, currentRowKey, colFam, &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;columnNames[i], cols[i].getBytes()); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;puts.add(req); &nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;return puts; &nbsp;} &nbsp;@Override &nbsp;public List getIncrements() { &lt;/span&gt; &nbsp;&nbsp;&nbsp;incs.clear(); &nbsp;&nbsp;&nbsp;//Increment the number of events received &nbsp;&nbsp;&nbsp;incs.add(new AtomicIncrementRequest(table, &quot;totalEvents&quot;.getBytes(), colFam, eventCountCol)); &nbsp;&nbsp;&nbsp;return incs; &nbsp;} &nbsp;@Override &nbsp;public void cleanUp() { &nbsp;&nbsp;&nbsp;table = null; &nbsp;&nbsp;&nbsp;colFam = null; &nbsp;&nbsp;&nbsp;currentEvent = null; &nbsp;&nbsp;&nbsp;columnNames = null; &nbsp;&nbsp;&nbsp;currentRowKey = null; &nbsp;} &nbsp;@Override &nbsp;public void configure(Context context) { &nbsp;&nbsp;&nbsp;//Get the column names from the configuration &nbsp;&nbsp;&nbsp;String cols = new String(context.getString(&quot;columns&quot;)); &nbsp;&nbsp;&nbsp;String[] names = cols.split(&quot;,&quot;); &nbsp;&nbsp;&nbsp;byte[][] columnNames = new byte[names.length][]; &nbsp;&nbsp;&nbsp;int i = 0; &nbsp;&nbsp;&nbsp;for(String name : names) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;columnNames[i++] = name.getBytes(); &nbsp;&nbsp;&nbsp;} &nbsp;} &nbsp;@Override &nbsp;public void configure(ComponentConfiguration conf) { &nbsp;} }&lt;/font&gt;&lt;/pre&gt; &lt;/p&gt; This serializer splits the event body based on a delimiter and inserts each split into a different column. The row is defined in the event header. When each event is received, a counter is incremented to keep track of the number of events received as well. This serializer can be configured by the following configuration: &lt;/strong&gt; &lt;/strong&gt; host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink host1.sinks.sink1.channel = ch1 host1.sinks.sink1.table = transactions host1.sinks.sink1.columnFamily = clients host1.sinks.sink1.batchSize = 5000 #The serializer to use host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SplittingSerializer #List of columns each event writes to. host1.sinks.sink1.serializer.columns = charges,date,priority Internals of the HBaseSink and AsyncHBaseSinkThe HBaseSink uses the HBase HTable API to write events out to HBase. HTable supports batching of Puts, but only HBase 0.92+ supports batching of Increments. Currently, the HBase Sink is single-threaded and will call the serializer to get the Puts and Increments once per event it processes. HBase Put and Increments are sent to HBase via blocking calls, which means the next event is read and passed to the serializer only once the current event is successfully written to HBase. Each transaction consists of at most the number of events specified by the batchSize property in the configuration. Like all other Flume sinks, if one of these events fails to get written successfully, the sink will retry the entire transaction again.On the other hand, the AsyncHBaseSink uses the asynchbase API, and sends out events asynchronously to HBase. The AsyncHBaseSink, in the same way as the HBase sink, generates Puts and Increments for each event. Once the Puts and Increments are generated, the sink sends them out immediately to HBase and moves on to process the next event. Success or failure is handled through callbacks. Again, each transaction consists of at most the number of events specified by the batchSize configuration parameter. The sink waits until either success callbacks are received for all the events sent, or at least one error callback is received. If an error callback is received, the entire transaction is retried, in true Flume style. A word of cautionAs you can see, if HBase reports failure to write even one Put or Increment, the entire transaction is retried - this is how Flume&rsquo;s at-least-once semantics work, and most Flume sinks operate in the same way. In case of HBase Increments, this means it is possible that the same event would cause a counter to be incremented more than once. This is something to keep in mind while using Flume to perform Increments. Also, if the serializer is not idempotent, then this means that it is possible that the same event can cause multiple different Puts to be written to HBase. Imagine a case where we are talking about credit card transactions represented by the event. If the same event can generate different Puts each time, it is possible that HBase would have multiple records of the same transactions, which is probably not desired.The AsyncHBaseSink is known to give better performance than the HBaseSink primarily because of the non-blocking nature of the underlying API it uses. The HBase community is working on improving the HBase client API to improve its performance, which would vastly improve the HBaseSink performance.ConclusionFlume is an excellent tool to write events out to the different storage systems in the Hadoop ecosystem including HBase. The HBase sinks provide the functionality to write data to HBase in your own schema and allows the user to &ldquo;map&rdquo; the Flume event to HBase data." />
<link rel="canonical" href="http://localhost:4000/flume/entry/streaming_data_into_apache_hbase" />
<meta property="og:url" content="http://localhost:4000/flume/entry/streaming_data_into_apache_hbase" />
<meta property="og:site_name" content="Blogs Archive" />
<meta property="og:type" content="article" />
<meta property="article:published_time" content="2012-11-27T21:39:20-05:00" />
<meta name="twitter:card" content="summary" />
<meta property="twitter:title" content="Streaming data into Apache HBase using Apache Flume" />
<script type="application/ld+json">
{"@context":"https://schema.org","@type":"BlogPosting","dateModified":"2012-11-27T21:39:20-05:00","datePublished":"2012-11-27T21:39:20-05:00","description":"Apache Flume was conceived as a fault-tolerant ingest system for the Apache Hadoop ecosystem. Flume comes packaged with an HDFS Sink which can be used to write events into HDFS, and two different implementations of HBase sinks to write events into HBase. You can read about the basic architecture of Apache Flume 1.x in this blog post. You can also read about how Flume&rsquo;s File Channel persists events and still provides extremely high performance in an earlier blog post. In this article, we will explore how to configure Flume to write events into HBase, and write custom serializers to write events into HBase in a format of the user&rsquo;s choice. Data is stored in HBase as tables. Each table has one or more column families, and each column family has one or more columns. HBase stores all columns in a column family in physical proximity. Each row is identified by a key known as the row key. To insert data into HBase, the table name, column family, column name and row key have to be specified. More details on the HBase data model can be found in the HBase documentation.Flume has two HBase Sinks, the HBaseSink(org.apache.flume.sink.hbase.HBaseSink) and AsyncHBaseSink(org.apache.flume.sink.hbase.AsyncHBaseSink). These two sinks will eventually converge to similar functionality, but currently each has some advantages over the other: &lt;/p&gt; The AsyncHBaseSink currently gives better performance than the HBaseSink, primarily because it makes non-blocking calls to HBase. The HBaseSink will soon support secure HBase clusters (FLUME-1626) and the new HBase IPC which was introduced in HBase 0.96. The configuration for both these sinks are very similar. A sample configuration is shown below: #Use the AsyncHBaseSink host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink #Use the HBaseSink #host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink host1.sinks.sink1.channel = ch1 host1.sinks.sink1.table = transactions host1.sinks.sink1.columnFamily = clients host1.sinks.sink1.column = charges host1.sinks.sink1.batchSize = 5000 #Use the SimpleAsyncHbaseEventSerializer that comes with Flume host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer #Use the SimpleHbaseEventSerializer that comes with Flume #host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer host1.sinks.sink1.serializer.incrementColumn = icol host1.channels.ch1.type=memory In the above config, the &ldquo;table&rdquo; parameter specifies the table in HBase that the sink has to write to - in this case, &ldquo;transactions&rdquo;; the &ldquo;columnFamily&rdquo; parameter specifies the column family in that table to insert the data into, in this case, &ldquo;clients&rdquo;; and the &ldquo;column&rdquo; parameter specifies the column in the column family to write to, in this case &ldquo;charges&rdquo;. Apart from this the sink requires the channel to be configured, like all other Flume Sinks. The other interesting configuration parameters are the &ldquo;serializer&rdquo; and the &ldquo;serializer.*&rdquo; parameters. The two sinks use different interfaces for the serializer. In both cases, the serializer is a class that converts the Flume Event into an HBase-friendly format. This piece of code that &ldquo;translates&rdquo; the events is usually specific to the schema used by the user&rsquo;s HBase cluster and is usually implemented by the user. All configuration parameters passed in as &ldquo;serializer.*&rdquo; are passed to the serializer. This configuration can be used to set up any internal state the serializer needs.In case of the HBaseSink, the serializer converts a Flume Event into one or more HBase Puts and/or Increments. The serializer must implement the HbaseEventSerializer. The serializer is instantiated when the sink is started by the Flume configuration framework. For each event processed by the sink, the sink calls the initialize method in the serializer. The serializer must &ldquo;translate&rdquo; the Flume Event into HBase puts and increments which should be returned by getActions and getIncrements methods. &nbsp;These puts and increments are then sent over the wire to the HBase cluster. When the sink stops, this instance of the serializer is closed by the HBaseSink.The AsyncHBaseSink&rsquo;s serializer must implement AsyncHbaseEventSerializer. &lt;/p&gt; In this case, the initialize method is called once by the sink, when it starts up. For every event, the sink calls the setEvent method and then calls the getActions and getIncrements methods - similar to the HBaseSink. When the sink is stopped, the serializer&rsquo;s cleanUp method is called. Notice that the methods do not return the standard HBase Puts and Increments, but PutRequest and AtomicIncrementRequest from the asynchbase API. These are roughly equivalent to the HBase Puts and Increments respectively, with some differences. &lt;/p&gt; An example of such a serializer is below. &lt;/strong&gt; &lt;/font&gt; /** * A serializer for the AsyncHBaseSink, which splits the event body into * multiple columns and inserts them into a row whose key is available in * the headers */ public class SplittingSerializer implements AsyncHbaseEventSerializer { &nbsp;private byte[] table; &nbsp;private byte[] colFam; &nbsp;private Event currentEvent; &nbsp;private byte[][] columnNames; &nbsp;private final List puts = new ArrayList(); &lt;/span&gt; &nbsp;private final List incs = new ArrayList(); &lt;/span&gt; &nbsp;private byte[] currentRowKey; &nbsp;private final byte[] eventCountCol = &quot;eventCount&quot;.getBytes(); &nbsp;@Override &nbsp;public void initialize(byte[] table, byte[] cf) { &nbsp;&nbsp;&nbsp;this.table = table; &nbsp;&nbsp;&nbsp;this.colFam = cf; &nbsp;} &nbsp;@Override &nbsp;public void setEvent(Event event) { &nbsp;&nbsp;&nbsp;// Set the event and verify that the rowKey is not present &nbsp;&nbsp;&nbsp;this.currentEvent = event; &nbsp;&nbsp;&nbsp;String rowKeyStr = currentEvent.getHeaders().get(&quot;rowKey&quot;); &nbsp;&nbsp;&nbsp;if (rowKeyStr == null) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw new FlumeException(&quot;No row key found in headers!&quot;); &nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;currentRowKey = rowKeyStr.getBytes(); &nbsp;} &nbsp;@Override &nbsp;public List getActions() { &lt;/span&gt; &nbsp;&nbsp;&nbsp;// Split the event body and get the values for the columns &nbsp;&nbsp;&nbsp;String eventStr = new String(currentEvent.getBody()); &nbsp;&nbsp;&nbsp;String[] cols = eventStr.split(&quot;,&quot;); &nbsp;&nbsp;&nbsp;puts.clear(); &nbsp;&nbsp;&nbsp;for (int i = 0; i &lt; cols.length; i++) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//Generate a PutRequest for each column. &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;PutRequest req = new PutRequest(table, currentRowKey, colFam, &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;columnNames[i], cols[i].getBytes()); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;puts.add(req); &nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;return puts; &nbsp;} &nbsp;@Override &nbsp;public List getIncrements() { &lt;/span&gt; &nbsp;&nbsp;&nbsp;incs.clear(); &nbsp;&nbsp;&nbsp;//Increment the number of events received &nbsp;&nbsp;&nbsp;incs.add(new AtomicIncrementRequest(table, &quot;totalEvents&quot;.getBytes(), colFam, eventCountCol)); &nbsp;&nbsp;&nbsp;return incs; &nbsp;} &nbsp;@Override &nbsp;public void cleanUp() { &nbsp;&nbsp;&nbsp;table = null; &nbsp;&nbsp;&nbsp;colFam = null; &nbsp;&nbsp;&nbsp;currentEvent = null; &nbsp;&nbsp;&nbsp;columnNames = null; &nbsp;&nbsp;&nbsp;currentRowKey = null; &nbsp;} &nbsp;@Override &nbsp;public void configure(Context context) { &nbsp;&nbsp;&nbsp;//Get the column names from the configuration &nbsp;&nbsp;&nbsp;String cols = new String(context.getString(&quot;columns&quot;)); &nbsp;&nbsp;&nbsp;String[] names = cols.split(&quot;,&quot;); &nbsp;&nbsp;&nbsp;byte[][] columnNames = new byte[names.length][]; &nbsp;&nbsp;&nbsp;int i = 0; &nbsp;&nbsp;&nbsp;for(String name : names) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;columnNames[i++] = name.getBytes(); &nbsp;&nbsp;&nbsp;} &nbsp;} &nbsp;@Override &nbsp;public void configure(ComponentConfiguration conf) { &nbsp;} }&lt;/font&gt;&lt;/pre&gt; &lt;/p&gt; This serializer splits the event body based on a delimiter and inserts each split into a different column. The row is defined in the event header. When each event is received, a counter is incremented to keep track of the number of events received as well. This serializer can be configured by the following configuration: &lt;/strong&gt; &lt;/strong&gt; host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink host1.sinks.sink1.channel = ch1 host1.sinks.sink1.table = transactions host1.sinks.sink1.columnFamily = clients host1.sinks.sink1.batchSize = 5000 #The serializer to use host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SplittingSerializer #List of columns each event writes to. host1.sinks.sink1.serializer.columns = charges,date,priority Internals of the HBaseSink and AsyncHBaseSinkThe HBaseSink uses the HBase HTable API to write events out to HBase. HTable supports batching of Puts, but only HBase 0.92+ supports batching of Increments. Currently, the HBase Sink is single-threaded and will call the serializer to get the Puts and Increments once per event it processes. HBase Put and Increments are sent to HBase via blocking calls, which means the next event is read and passed to the serializer only once the current event is successfully written to HBase. Each transaction consists of at most the number of events specified by the batchSize property in the configuration. Like all other Flume sinks, if one of these events fails to get written successfully, the sink will retry the entire transaction again.On the other hand, the AsyncHBaseSink uses the asynchbase API, and sends out events asynchronously to HBase. The AsyncHBaseSink, in the same way as the HBase sink, generates Puts and Increments for each event. Once the Puts and Increments are generated, the sink sends them out immediately to HBase and moves on to process the next event. Success or failure is handled through callbacks. Again, each transaction consists of at most the number of events specified by the batchSize configuration parameter. The sink waits until either success callbacks are received for all the events sent, or at least one error callback is received. If an error callback is received, the entire transaction is retried, in true Flume style. A word of cautionAs you can see, if HBase reports failure to write even one Put or Increment, the entire transaction is retried - this is how Flume&rsquo;s at-least-once semantics work, and most Flume sinks operate in the same way. In case of HBase Increments, this means it is possible that the same event would cause a counter to be incremented more than once. This is something to keep in mind while using Flume to perform Increments. Also, if the serializer is not idempotent, then this means that it is possible that the same event can cause multiple different Puts to be written to HBase. Imagine a case where we are talking about credit card transactions represented by the event. If the same event can generate different Puts each time, it is possible that HBase would have multiple records of the same transactions, which is probably not desired.The AsyncHBaseSink is known to give better performance than the HBaseSink primarily because of the non-blocking nature of the underlying API it uses. The HBase community is working on improving the HBase client API to improve its performance, which would vastly improve the HBaseSink performance.ConclusionFlume is an excellent tool to write events out to the different storage systems in the Hadoop ecosystem including HBase. The HBase sinks provide the functionality to write data to HBase in your own schema and allows the user to &ldquo;map&rdquo; the Flume event to HBase data.","headline":"Streaming data into Apache HBase using Apache Flume","mainEntityOfPage":{"@type":"WebPage","@id":"http://localhost:4000/flume/entry/streaming_data_into_apache_hbase"},"url":"http://localhost:4000/flume/entry/streaming_data_into_apache_hbase"}</script>
<!-- End Jekyll SEO tag -->
<link rel="stylesheet" href="/assets/main.css"><link type="application/atom+xml" rel="alternate" href="http://localhost:4000/feed.xml" title="Blogs Archive" /></head>
<body><header class="site-header" role="banner">
<div class="wrapper"><a class="site-title" rel="author" href="/">Blogs Archive</a><nav class="site-nav">
<input type="checkbox" id="nav-trigger" class="nav-trigger" />
<label for="nav-trigger">
<span class="menu-icon">
<svg viewBox="0 0 18 15" width="18px" height="15px">
<path d="M18,1.484c0,0.82-0.665,1.484-1.484,1.484H1.484C0.665,2.969,0,2.304,0,1.484l0,0C0,0.665,0.665,0,1.484,0 h15.032C17.335,0,18,0.665,18,1.484L18,1.484z M18,7.516C18,8.335,17.335,9,16.516,9H1.484C0.665,9,0,8.335,0,7.516l0,0 c0-0.82,0.665-1.484,1.484-1.484h15.032C17.335,6.031,18,6.696,18,7.516L18,7.516z M18,13.516C18,14.335,17.335,15,16.516,15H1.484 C0.665,15,0,14.335,0,13.516l0,0c0-0.82,0.665-1.483,1.484-1.483h15.032C17.335,12.031,18,12.695,18,13.516L18,13.516z"/>
</svg>
</span>
</label>
<div class="trigger"><a class="page-link" href="/about/">About</a></div>
</nav></div>
</header>
<main class="page-content" aria-label="Content">
<div class="wrapper">
<article class="post h-entry" itemscope itemtype="http://schema.org/BlogPosting">
<header class="post-header">
<h1 class="post-title p-name" itemprop="name headline">Streaming data into Apache HBase using Apache Flume</h1>
<p class="post-meta">
<time class="dt-published" datetime="2012-11-27T21:39:20-05:00" itemprop="datePublished">Nov 27, 2012
</time>• <span itemprop="author" itemscope itemtype="http://schema.org/Person"><span class="p-author h-card" itemprop="name">{"display_name"=>"hshreedharan", "login"=>"hshreedharan", "email"=>"hshreedharan@apache.org"}</span></span></p>
</header>
<div class="post-content e-content" itemprop="articleBody">
<p><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><font face="georgia, times new roman, times, serif"><font style="font-size: medium;"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">Apache Flume was conceived as a fault-tolerant ingest system for the Apache Hadoop ecosystem. Flume comes packaged with an HDFS Sink which can be used to write events into HDFS, and two different implementations of HBase sinks to write events into HBase. You can read about the basic architecture of Apache Flume 1.x </span><a href="https://blogs.apache.org/flume/entry/flume_ng_architecture"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">in this blog post</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">. </span></font></font><strong id="internal-source-marker_0.83681909320876" style="font-size: medium; font-weight: normal;"><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> You can also read about how Flume&rsquo;s File Channel persists events and still provides extremely high performance in an </span><a href="https://blogs.apache.org/flume/entry/apache_flume_filechannel"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">earlier blog post</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">. </span></font></strong><font face="georgia, times new roman, times, serif"><font style="font-size: medium;"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">In this article, we will explore how to configure Flume to write events into HBase, and write custom serializers to write events into HBase in a format of the user&rsquo;s choice. </span><br /><span style="font-size: 19px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">Data is stored in HBase as tables. Each table has one or more column families, and each column family has one or more columns. HBase stores all columns in a column family in physical proximity. Each row is identified by a key known as the row key. To insert data into HBase, the table name, column family, column name and row key have to be specified. More details on the HBase data model can be found </span><a href="http://hbase.apache.org/book.html#datamodel"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">in the HBase documentation</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">Flume has two HBase Sinks, the HBaseSink(org.apache.flume.sink.hbase.HBaseSink) and AsyncHBaseSink(org.apache.flume.sink.hbase.AsyncHBaseSink). These two sinks will eventually converge to similar functionality, but currently each has some advantages over the other:</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /></font> </font> </p>
<ul style="font-size: medium; margin-top: 0pt; margin-bottom: 0pt;">
<li dir="ltr" style="list-style-type: disc; font-size: 15px; background-color: transparent; vertical-align: baseline;"><span style="background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"><font face="georgia, times new roman, times, serif">The AsyncHBaseSink currently gives better performance than the HBaseSink, primarily because it makes non-blocking calls to HBase.</font></span></li>
<li dir="ltr" style="list-style-type: disc; font-size: 15px; background-color: transparent; vertical-align: baseline;"><font face="georgia, times new roman, times, serif"><span style="background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The HBaseSink will soon support secure HBase clusters (<a href="https://issues.apache.org/jira/browse/FLUME-1626">FLUME-1626</a>) and the new HBase IPC which was introduced in HBase 0.96.</span></font></li>
</ul>
<p style="font-size: medium;"><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The configuration for both these sinks are very similar. A sample configuration is shown below:</span></font> </p>
<pre><font size="3"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#Use the AsyncHBaseSink
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#Use the HBaseSink
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.channel = ch1
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.table = transactions
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.columnFamily = clients
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.column = charges
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.batchSize = 5000
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#Use the SimpleHbaseEventSerializer that comes with Flume
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.serializer.incrementColumn = icol
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.channels.ch1.type=memory</span></font></pre>
<p><font face="georgia, times new roman, times, serif"><font><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">In the above config, the &ldquo;table&rdquo; parameter specifies the table in HBase that the sink has to write to - in this case, &ldquo;transactions&rdquo;; the &ldquo;columnFamily&rdquo; parameter specifies the column family in that table to insert the data into, in this case, &ldquo;clients&rdquo;; and the &ldquo;column&rdquo; parameter specifies the column in the column family to write to, in this case &ldquo;charges&rdquo;. Apart from this the sink requires the channel to be configured, like all other Flume Sinks. The other interesting configuration parameters are the &ldquo;serializer&rdquo; and the &ldquo;serializer.*&rdquo; parameters. The two sinks use different interfaces for the serializer. In both cases, the serializer is a class that converts the Flume Event into an HBase-friendly format. This piece of code that &ldquo;translates&rdquo; the events is usually specific to the schema used by the user&rsquo;s HBase cluster and is usually implemented by the user. All configuration parameters passed in as &ldquo;serializer.*&rdquo; are passed to the serializer. This configuration can be used to set up any internal state the serializer needs.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">In case of the HBaseSink, the serializer converts a Flume </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/Event.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">Event</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> into one or more HBase </span><a href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Put.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">Put</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">s and/or </span><a href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Increment.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">Increment</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">s. The serializer must implement the </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">HbaseEventSerializer</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">. The serializer is instantiated when the sink is started by the Flume configuration framework. For each event processed by the sink, the sink calls the </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html#initialize(org.apache.flume.Event,%20byte[])" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">initialize</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> method in the serializer. The serializer must &ldquo;translate&rdquo; the Flume Event into HBase puts and increments which should be returned by </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html#getActions()" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">getActions</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> and </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html#getIncrements()" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">getIncrements</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> methods. &nbsp;These puts and increments are then sent over the wire to the HBase cluster. When the sink stops, this instance of the serializer is </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/HbaseEventSerializer.html#close()" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">closed </span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">by the HBaseSink.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The AsyncHBaseSink&rsquo;s serializer must implement </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html" style="font-size: medium;"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">AsyncHbaseEventSerializer</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">. </span></font> </p>
<p style="font-size: medium; display: inline !important;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">In this case, the </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#initialize(byte[],%20byte[])"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">initialize</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> method is called once by the sink, when it starts up. For every event, the sink calls the <a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#setEvent(org.apache.flume.Event)">setEvent</a> method and then calls the </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#getActions()"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">getActions</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> and </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#getIncrements()"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">getIncrements</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> methods - similar to the HBaseSink. When the sink is stopped, the serializer&rsquo;s </span><a href="http://flume.apache.org/releases/content/1.2.0/apidocs/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.html#cleanUp()"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">cleanUp</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> method is called. Notice that the methods do not return the standard HBase Puts and Increments, but </span><a href="http://tsunanet.net/~tsuna/asynchbase/api/org/hbase/async/PutRequest.html"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">PutRequest</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> and </span><a href="http://tsunanet.net/~tsuna/asynchbase/api/org/hbase/async/AtomicIncrementRequest.html"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">AtomicIncrementRequest</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> from the </span><a href="http://tsunanet.net/~tsuna/asynchbase/api/"><span style="font-size: 15px; color: #1155cc; background-color: transparent; text-decoration: underline; vertical-align: baseline; white-space: pre-wrap;">asynchbase</span></a><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> API. These are roughly equivalent to the HBase Puts and Increments respectively, with some differences. </span></strong></strong></p>
<p><strong id="internal-source-marker_0.83681909320876" style="font-size: medium; font-weight: normal;"> </p>
<p style="display: inline !important;"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">An example of such a serializer is below. </span></p>
<p></strong><br /> </font> </p>
<pre><font size="3"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">/**
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> * A serializer for the AsyncHBaseSink, which splits the event body into
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> * multiple columns and inserts them into a row whose key is available in
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> * the headers
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> */
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">public class SplittingSerializer implements AsyncHbaseEventSerializer {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;private byte[] table;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;private byte[] colFam;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;private Event currentEvent;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;private byte[][] columnNames;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;private final List<PutRequest> puts = new ArrayList<PutRequest>();
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;private byte[] currentRowKey;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;private final byte[] eventCountCol = "eventCount".getBytes();</span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;@Override
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;public void initialize(byte[] table, byte[] cf) {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;this.table = table;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;this.colFam = cf;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;}</span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;@Override
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;public void setEvent(Event event) {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;// Set the event and verify that the rowKey is not present
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;this.currentEvent = event;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;String rowKeyStr = currentEvent.getHeaders().get("rowKey");
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;if (rowKeyStr == null) {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw new FlumeException("No row key found in headers!");
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;}
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;currentRowKey = rowKeyStr.getBytes();
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;}</span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;@Override
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;public List<PutRequest> getActions() {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;// Split the event body and get the values for the columns
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;String eventStr = new String(currentEvent.getBody());
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;String[] cols = eventStr.split(",");
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;puts.clear();
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;for (int i = 0; i < cols.length; i++) {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//Generate a PutRequest for each column.
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;PutRequest req = new PutRequest(table, currentRowKey, colFam,
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;columnNames[i], cols[i].getBytes());
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;puts.add(req);
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;}
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;return puts;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;}</span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;@Override
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;public List<AtomicIncrementRequest> getIncrements() {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;incs.clear();
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;//Increment the number of events received
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;return incs;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;}</span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;@Override
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;public void cleanUp() {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;table = null;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;colFam = null;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;currentEvent = null;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;columnNames = null;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;currentRowKey = null;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;}</span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;@Override
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;public void configure(Context context) {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;//Get the column names from the configuration
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;String cols = new String(context.getString("columns"));
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;String[] names = cols.split(",");
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;byte[][] columnNames = new byte[names.length][];
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;int i = 0;
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;for(String name : names) {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;columnNames[i++] = name.getBytes();
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;&nbsp;&nbsp;}
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;}</span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;@Override
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;public void configure(ComponentConfiguration conf) {
</span><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"> &nbsp;}</span>
<span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">}</span></font></pre>
<p style="font-size: medium;"><span style="font-size: 15px; font-family: Arial; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"> </strong></p>
<p style="font-size: medium; display: inline !important;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"></strong></strong></strong></p>
<p><strong id="internal-source-marker_0.83681909320876" style="font-size: medium; font-weight: normal;"> </p>
<p style="display: inline !important;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><strong id="internal-source-marker_0.83681909320876" style="font-weight: normal;"><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">This serializer splits the event body based on a delimiter and inserts each split into a different column. The row is defined in the event header. When each event is received, a counter is incremented to keep track of the number of events received as well.</span></font></strong></strong></p>
<p><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">This serializer can be configured by the following configuration:</span></font> </p>
<p></strong> </p>
<p style="font-size: medium;">
<p></strong> </p>
<pre><font size="3"><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
</span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.channel = ch1
</span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.table = transactions
</span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.columnFamily = clients
</span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.batchSize = 5000</span></b></strong></font>
<font size="3"><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#The serializer to use
</span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SplittingSerializer</span></b></strong></font>
<font size="3"><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">#List of columns each event writes to.
</span></b></strong><strong style="font-weight: normal;"><b id="internal-source-marker_0.83681909320876" style="font-family: Times; font-weight: normal;"><span style="font-family: 'Courier New'; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">host1.sinks.sink1.serializer.columns = charges,date,priority</span></b></strong></font></pre>
<p><strong id="internal-source-marker_0.83681909320876" style="font-size: medium; font-weight: normal;"><font face="georgia, times new roman, times, serif"><span style="font-size: 15px; background-color: transparent; font-weight: bold; vertical-align: baseline; white-space: pre-wrap;">Internals of the HBaseSink and AsyncHBaseSink</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The HBaseSink uses the HBase HTable API to write events out to HBase. HTable supports batching of Puts, but only HBase 0.92+ supports batching of Increments. Currently, the HBase Sink is single-threaded and will call the serializer to get the Puts and Increments once per event it processes. HBase Put and Increments are sent to HBase via blocking calls, which means the next event is read and passed to the serializer only once the current event is successfully written to HBase. Each transaction consists of at most the number of events specified by the batchSize property in the configuration. Like all other Flume sinks, if one of these events fails to get written successfully, the sink will retry the entire transaction again.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">On the other hand, the AsyncHBaseSink uses the asynchbase API, and sends out events asynchronously to HBase. The AsyncHBaseSink, in the same way as the HBase sink, generates Puts and Increments for each event. Once the Puts and Increments are generated, the sink sends them out immediately to HBase and moves on to process the next event. Success or failure is handled through callbacks. Again, each transaction consists of at most the number of events specified by the batchSize configuration parameter. The sink waits until either success callbacks are received for all the events sent, or at least one error callback is received. If an error callback is received, the entire transaction is retried, in true Flume style. </span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; font-weight: bold; vertical-align: baseline; white-space: pre-wrap;">A word of caution</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">As you can see, if HBase reports failure to write even one Put or Increment, the entire transaction is retried - this is how Flume&rsquo;s at-least-once semantics work, and most Flume sinks operate in the same way. In case of HBase Increments, this means it is possible that the same event would cause a counter to be incremented more than once. This is something to keep in mind while using Flume to perform Increments. Also, if the serializer is not idempotent, then this means that it is possible that the same event can cause multiple different Puts to be written to HBase. Imagine a case where we are talking about credit card transactions represented by the event. If the same event can generate different Puts each time, it is possible that HBase would have multiple records of the same transactions, which is probably not desired.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">The AsyncHBaseSink is known to give better performance than the HBaseSink primarily because of the non-blocking nature of the underlying API it uses. The HBase community is working on improving the HBase client API to improve its performance, which would vastly improve the HBaseSink performance.</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;"></span><br /><span style="font-size: 15px; background-color: transparent; font-weight: bold; vertical-align: baseline; white-space: pre-wrap;">Conclusion</span><br /><span style="font-size: 15px; background-color: transparent; vertical-align: baseline; white-space: pre-wrap;">Flume is an excellent tool to write events out to the different storage systems in the Hadoop ecosystem including HBase. The HBase sinks provide the functionality to write data to HBase in your own schema and allows the user to &ldquo;map&rdquo; the Flume event to HBase data.</span></font><br /></strong> </p>
</div><a class="u-url" href="/flume/entry/streaming_data_into_apache_hbase" hidden></a>
</article>
</div>
</main><footer class="site-footer h-card">
<data class="u-url" href="/"></data>
<div class="wrapper">
<h2 class="footer-heading">Blogs Archive</h2>
<div class="footer-col-wrapper">
<div class="footer-col footer-col-1">
<ul class="contact-list">
<li class="p-name">Blogs Archive</li><li><a class="u-email" href="mailto:issues@infra.apache.org">issues@infra.apache.org</a></li></ul>
</div>
<div class="footer-col footer-col-2"><ul class="social-media-list"><li><a href="https://github.com/jekyll"><svg class="svg-icon"><use xlink:href="/assets/minima-social-icons.svg#github"></use></svg> <span class="username">jekyll</span></a></li><li><a href="https://www.twitter.com/jekyllrb"><svg class="svg-icon"><use xlink:href="/assets/minima-social-icons.svg#twitter"></use></svg> <span class="username">jekyllrb</span></a></li></ul>
</div>
<div class="footer-col footer-col-3">
<p>This is an archive of the Roller blogs that were previously hosted on blogs.apache.org</p>
</div>
</div>
</div>
</footer>
</body>
</html>