<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8" />
    <meta http-equiv="X-UA-Compatible" content="IE=edge" />
    <meta name="viewport" content="width=device-width, initial-scale=1" />
    <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
    <meta name="description" content="A new open source Apache Hadoop ecosystem project, Apache Kudu completes Hadoop's storage layer to enable fast analytics on fast data" />
    <meta name="author" content="Cloudera" />
    <title>Apache Kudu - An Introduction to the Flume Kudu Sink</title>
    <!-- Bootstrap core CSS -->
    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.6/css/bootstrap.min.css"
          integrity="sha384-1q8mTJOASx8j1Au+a5WDVnPi2lkFfwwEAa8hDDdjZlpLegxhjVME1fgjWPGmkzs7"
          crossorigin="anonymous">

    <!-- Custom styles for this template -->
    <link href="/css/kudu.css" rel="stylesheet"/>
    <link href="/css/asciidoc.css" rel="stylesheet"/>
    <link rel="shortcut icon" href="/img/logo-favicon.ico" />
    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/font-awesome/4.6.1/css/font-awesome.min.css" />

    
    <link rel="alternate" type="application/atom+xml"
      title="RSS Feed for Apache Kudu blog"
      href="/feed.xml" />
    

    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
    <!--[if lt IE 9]>
        <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
        <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
        <![endif]-->
  </head>
  <body>
    <div class="kudu-site container-fluid">
      <!-- Static navbar -->
        <nav class="navbar navbar-default">
          <div class="container-fluid">
            <div class="navbar-header">
              <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar">
                <span class="sr-only">Toggle navigation</span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
              </button>
              
              <a class="logo" href="/"><img
                src="//d3dr9sfxru4sde.cloudfront.net/i/k/apachekudu_logo_0716_80px.png"
                srcset="//d3dr9sfxru4sde.cloudfront.net/i/k/apachekudu_logo_0716_80px.png 1x, //d3dr9sfxru4sde.cloudfront.net/i/k/apachekudu_logo_0716_160px.png 2x"
                alt="Apache Kudu"/></a>
              
            </div>
            <div id="navbar" class="collapse navbar-collapse">
              <ul class="nav navbar-nav navbar-right">
                <li >
                  <a href="/">Home</a>
                </li>
                <li >
                  <a href="/overview.html">Overview</a>
                </li>
                <li >
                  <a href="/docs/">Documentation</a>
                </li>
                <li >
                  <a href="/releases/">Download</a>
                </li>
                <li class="active">
                  <a href="/blog/">Blog</a>
                </li>
                <!-- NOTE: this dropdown menu does not appear on Mobile, so don't add anything here
                     that doesn't also appear elsewhere on the site. -->
                <li class="dropdown">
                  <a href="/community.html" role="button" aria-haspopup="true" aria-expanded="false">Community <span class="caret"></span></a>
                  <ul class="dropdown-menu">
                    <li class="dropdown-header">GET IN TOUCH</li>
                    <li><a class="icon email" href="/community.html">Mailing Lists</a></li>
                    <li><a class="icon slack" href="https://getkudu-slack.herokuapp.com/">Slack Channel</a></li>
                    <li role="separator" class="divider"></li>
                    <li><a href="/community.html#meetups-user-groups-and-conference-presentations">Events and Meetups</a></li>
                    <li><a href="/committers.html">Project Committers</a></li>
                    <!--<li><a href="/roadmap.html">Roadmap</a></li>-->
                    <li><a href="/community.html#contributions">How to Contribute</a></li>
                    <li role="separator" class="divider"></li>
                    <li class="dropdown-header">DEVELOPER RESOURCES</li>
                    <li><a class="icon github" href="https://github.com/apache/incubator-kudu">GitHub</a></li>
                    <li><a class="icon gerrit" href="http://gerrit.cloudera.org:8080/#/q/status:open+project:kudu">Gerrit Code Review</a></li>
                    <li><a class="icon jira" href="https://issues.apache.org/jira/browse/KUDU">JIRA Issue Tracker</a></li>
                    <li role="separator" class="divider"></li>
                    <li class="dropdown-header">SOCIAL MEDIA</li>
                    <li><a class="icon twitter" href="https://twitter.com/ApacheKudu">Twitter</a></li>
                    <li><a href="https://www.reddit.com/r/kudu/">Reddit</a></li>
                    <li role="separator" class="divider"></li>
                    <li class="dropdown-header">APACHE SOFTWARE FOUNDATION</li>
                    <li><a href="https://www.apache.org/security/" target="_blank">Security</a></li>
                    <li><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank">Sponsorship</a></li>
                    <li><a href="https://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li>
                    <li><a href="https://www.apache.org/licenses/" target="_blank">License</a></li>
                  </ul>
                </li>
                <li >
                  <a href="/faq.html">FAQ</a>
                </li>
              </ul><!-- /.nav -->
            </div><!-- /#navbar -->
          </div><!-- /.container-fluid -->
        </nav>

<div class="row header">
  <div class="col-lg-12">
    <h2><a href="/blog">Apache Kudu Blog</a></h2>
  </div>
</div>

<div class="row-fluid">
  <div class="col-lg-9">
    <article>
  <header>
    <h1 class="entry-title">An Introduction to the Flume Kudu Sink</h1>
    <p class="meta">Posted 31 Aug 2016 by Ara Abrahamian</p>
  </header>
  <div class="entry-content">
    <p>This post discusses the Kudu Flume Sink. First, I’ll give some background on why we considered
using Kudu, what Flume does for us, and how Flume fits with Kudu in our project.</p>

<h2 id="why-kudu">Why Kudu</h2>

<p>Traditionally in the Hadoop ecosystem we’ve dealt with various <em>batch processing</em> technologies such
as MapReduce and the many libraries and tools built on top of it in various languages (Apache Pig,
Apache Hive, Apache Oozie and many others). The main problem with this approach is that it needs to
process the whole data set in batches, again and again, as soon as new data gets added. Things get
really complicated when a few such tasks need to get chained together, or when the same data set
needs to be processed in various ways by different jobs, while all compete for the shared cluster
resources.</p>

<p>The opposite of this approach is <em>stream processing</em>: process the data as soon as it arrives, not
in batches. Streaming systems such as Spark Streaming, Storm, Kafka Streams, and many others make
this possible. But writing streaming services is not trivial. The streaming systems are becoming
more and more capable and support more complex constructs, but they are not yet easy to use. All
queries and processes need to be carefully planned and implemented.</p>

<p>To summarize, <em>batch processing</em> is:</p>

<ul>
  <li>file-based</li>
  <li>a paradigm that processes large chunks of data as a group</li>
  <li>high latency and high throughput, both for ingest and query</li>
  <li>typically easy to program, but hard to orchestrate</li>
  <li>well suited for writing ad-hoc queries, although they are typically high latency</li>
</ul>

<p>While <em>stream processing</em> is:</p>

<ul>
  <li>a totally different paradigm, which involves single events and time windows instead of large groups of events</li>
  <li>still file-based and not a long-term database</li>
  <li>not batch-oriented, but incremental</li>
  <li>ultra-fast ingest and ultra-fast query (query results basically pre-calculated)</li>
  <li>not so easy to program, relatively easy to orchestrate</li>
  <li>impossible to write ad-hoc queries</li>
</ul>

<p>And a Kudu-based <em>near real-time</em> approach is:</p>

<ul>
  <li>flexible and expressive, thanks to SQL support via Apache Impala (incubating)</li>
  <li>a table-oriented, mutable data store that feels like a traditional relational database</li>
  <li>very easy to program, you can even pretend it’s good old MySQL</li>
  <li>low-latency and relatively high throughput, both for ingest and query</li>
</ul>

<p>At Argyle Data, we’re dealing with complex fraud detection scenarios. We need to ingest massive
amounts of data, run machine learning algorithms and generate reports. When we created our current
architecture two years ago we decided to opt for a database as the backbone of our system. That
database is Apache Accumulo. It’s a key-value based database which runs on top of Hadoop HDFS,
quite similar to HBase but with some important improvements such as cell level security and ease
of deployment and management. To enable querying of this data for quite complex reporting and
analytics, we used Presto, a distributed query engine with a pluggable architecture open-sourced
by Facebook. We wrote a connector for it to let it run queries against the Accumulo database. This
architecture has served us well, but there were a few problems:</p>

<ul>
  <li>we need to ingest even more massive volumes of data in real-time</li>
  <li>we need to perform complex machine-learning calculations on even larger data-sets</li>
  <li>we need to support ad-hoc queries, plus long-term data warehouse functionality</li>
</ul>

<p>So, we’ve started gradually moving the core machine-learning pipeline to a streaming based
solution. This way we can ingest and process larger data-sets faster in the real-time. But then how
would we take care of ad-hoc queries and long-term persistence? This is where Kudu comes in. While
the machine learning pipeline ingests and processes real-time data, we store a copy of the same
ingested data in Kudu for long-term access and ad-hoc queries. Kudu is our <em>data warehouse</em>. By
using Kudu and Impala, we can retire our in-house Presto connector and rely on Impala’s
super-fast query engine.</p>

<p>But how would we make sure data is reliably ingested into the streaming pipeline <em>and</em> the
Kudu-based data warehouse? This is where Apache Flume comes in.</p>

<h2 id="why-flume">Why Flume</h2>

<p>According to their <a href="http://flume.apache.org/">website</a> “Flume is a distributed, reliable, and
available service for efficiently collecting, aggregating, and moving large amounts of log data.
It has a simple and flexible architecture based on streaming data flows. It is robust and fault
tolerant with tunable reliability mechanisms and many failover and recovery mechanisms.” As you
can see, nowhere is Hadoop mentioned but Flume is typically used for ingesting data to Hadoop
clusters.</p>

<p><img src="https://blogs.apache.org/flume/mediaresource/ab0d50f6-a960-42cc-971e-3da38ba3adad" alt="png" /></p>

<p>Flume has an extensible architecture. An instance of Flume, called an <em>agent</em>, can have multiple
<em>channels</em>, with each having multiple <em>sources</em> and <em>sinks</em> of various types. Sources queue data
in channels, which in turn write out data to sinks. Such <em>pipelines</em> can be chained together to
create even more complex ones. There may be more than one agent and agents can be configured to
support failover and recovery.</p>

<p>Flume comes with a bunch of built-in types of channels, sources and sinks. Memory channel is the
default (an in-memory queue with no persistence to disk), but other options such as Kafka- and
File-based channels are also provided. As for the sources, Avro, JMS, Thrift, spooling directory
source are some of the built-in ones. Flume also ships with many sinks, including sinks for writing
data to HDFS, HBase, Hive, Kafka, as well as to other Flume agents.</p>

<p>In the rest of this post I’ll go over the Kudu Flume sink and show you how to configure Flume to
write ingested data to a Kudu table. The sink has been part of the Kudu distribution since the 0.8
release and the source code can be found <a href="https://github.com/apache/kudu/tree/master/java/kudu-flume-sink">here</a>.</p>

<h2 id="configuring-the-kudu-flume-sink">Configuring the Kudu Flume Sink</h2>

<p>Here is a sample flume configuration file:</p>

<pre><code>agent1.sources  = source1
agent1.channels = channel1
agent1.sinks = sink1

agent1.sources.source1.type = exec
agent1.sources.source1.command = /usr/bin/vmstat 1
agent1.sources.source1.channels = channel1

agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000

agent1.sinks.sink1.type = org.apache.flume.sink.kudu.KuduSink
agent1.sinks.sink1.masterAddresses = localhost
agent1.sinks.sink1.tableName = stats
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.batchSize = 50
agent1.sinks.sink1.producer = org.apache.kudu.flume.sink.SimpleKuduEventProducer
</code></pre>

<p>We define a source called <code>source1</code> which simply executes a <code>vmstat</code> command to continuously generate
virtual memory statistics for the machine and queue events into an in-memory <code>channel1</code> channel,
which in turn is used for writing these events to a Kudu table called <code>stats</code>. We are using
<code>org.apache.kudu.flume.sink.SimpleKuduEventProducer</code> as the producer. <code>SimpleKuduEventProducer</code> is
the built-in and default producer, but it’s implemented as a showcase for how to write Flume
events into Kudu tables. For any serious functionality we’d have to write a custom producer. We
need to make this producer and the <code>KuduSink</code> class available to Flume. We can do that by simply
copying the <code>kudu-flume-sink-&lt;VERSION&gt;.jar</code> jar file from the Kudu distribution to the
<code>$FLUME_HOME/plugins.d/kudu-sink/lib</code> directory in the Flume installation. The jar file contains
<code>KuduSink</code> and all of its dependencies (including Kudu java client classes).</p>

<p>At a minimum, the Kudu Flume Sink needs to know where the Kudu masters are
(<code>agent1.sinks.sink1.masterAddresses = localhost</code>) and which Kudu table should be used for writing
Flume events to (<code>agent1.sinks.sink1.tableName = stats</code>). The Kudu Flume Sink doesn’t create this
table, it has to be created before the Kudu Flume Sink is started.</p>

<p>You may also notice the <code>batchSize</code> parameter. Batch size is used for batching up to that many
Flume events and flushing the entire batch in one shot. Tuning batchSize properly can have a huge
impact on ingest performance of the Kudu cluster.</p>

<p>Here is a complete list of KuduSink parameters:</p>

<table>
  <thead>
    <tr>
      <th>Parameter Name</th>
      <th>Default</th>
      <th>Description</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>masterAddresses</td>
      <td>N/A</td>
      <td>Comma-separated list of “host:port” pairs of the masters (port optional)</td>
    </tr>
    <tr>
      <td>tableName</td>
      <td>N/A</td>
      <td>The name of the table in Kudu to write to</td>
    </tr>
    <tr>
      <td>producer</td>
      <td>org.apache.kudu.flume.sink.SimpleKuduEventProducer</td>
      <td>The fully qualified class name of the Kudu event producer the sink should use</td>
    </tr>
    <tr>
      <td>batchSize</td>
      <td>100</td>
      <td>Maximum number of events the sink should take from the channel per transaction, if available</td>
    </tr>
    <tr>
      <td>timeoutMillis</td>
      <td>30000</td>
      <td>Timeout period for Kudu operations, in milliseconds</td>
    </tr>
    <tr>
      <td>ignoreDuplicateRows</td>
      <td>true</td>
      <td>Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu</td>
    </tr>
  </tbody>
</table>

<p>Let’s take a look at the source code for the built-in producer class:</p>

<pre><code class="language-java">public class SimpleKuduEventProducer implements KuduEventProducer {
  private byte[] payload;
  private KuduTable table;
  private String payloadColumn;

  public SimpleKuduEventProducer(){
  }

  @Override
  public void configure(Context context) {
    payloadColumn = context.getString("payloadColumn","payload");
  }

  @Override
  public void configure(ComponentConfiguration conf) {
  }

  @Override
  public void initialize(Event event, KuduTable table) {
    this.payload = event.getBody();
    this.table = table;
  }

  @Override
  public List&lt;Operation&gt; getOperations() throws FlumeException {
    try {
      Insert insert = table.newInsert();
      PartialRow row = insert.getRow();
      row.addBinary(payloadColumn, payload);

      return Collections.singletonList((Operation) insert);
    } catch (Exception e){
      throw new FlumeException("Failed to create Kudu Insert object!", e);
    }
  }

  @Override
  public void close() {
  }
}
</code></pre>

<p><code>SimpleKuduEventProducer</code> implements the <code>org.apache.kudu.flume.sink.KuduEventProducer</code> interface,
which itself looks like this:</p>

<pre><code class="language-java">public interface KuduEventProducer extends Configurable, ConfigurableComponent {
  /**
   * Initialize the event producer.
   * @param event to be written to Kudu
   * @param table the KuduTable object used for creating Kudu Operation objects
   */
  void initialize(Event event, KuduTable table);

  /**
   * Get the operations that should be written out to Kudu as a result of this
   * event. This list is written to Kudu using the Kudu client API.
   * @return List of {@link org.kududb.client.Operation} which
   * are written as such to Kudu
   */
  List&lt;Operation&gt; getOperations();

  /*
   * Clean up any state. This will be called when the sink is being stopped.
   */
  void close();
}
</code></pre>

<p><code>public void configure(Context context)</code> is called when an instance of our producer is instantiated
by the KuduSink. SimpleKuduEventProducer’s implementation looks for a producer parameter named
<code>payloadColumn</code> and uses its value (“payload” if not overridden in Flume configuration file) as the
column which will hold the value of the Flume event payload. If you recall from above, we had
configured the KuduSink to listen for events generated from the <code>vmstat</code> command. Each output row
from that command will be stored as a new row containing a <code>payload</code> column in the <code>stats</code> table.
<code>SimpleKuduEventProducer</code> does not have any configuration parameters, but if it had any we would
define them by prefixing it with <code>producer.</code> (<code>agent1.sinks.sink1.producer.parameter1</code> for
example).</p>

<p>The main producer logic resides in the <code>public List&lt;Operation&gt; getOperations()</code> method. In
SimpleKuduEventProducer’s implementation we simply insert the binary body of the Flume event into
the Kudu table. Here we call Kudu’s <code>newInsert()</code> to initiate an insert, but could have used
<code>Upsert</code> if updating an existing row was also an option, in fact there’s another producer
implementation available for doing just that: <code>SimpleKeyedKuduEventProducer</code>. Most probably you
will need to write your own custom producer in the real world, but you can base your implementation
on the built-in ones.</p>

<p>In the future, we plan to add more flexible event producer implementations so that creation of a
custom event producer is not required to write data to Kudu. See
<a href="https://gerrit.cloudera.org/#/c/4034/">here</a> for a work-in-progress generic event producer for
Avro-encoded Events.</p>

<h2 id="conclusion">Conclusion</h2>

<p>Kudu is a scalable data store which lets us ingest insane amounts of data per second. Apache Flume
helps us aggregate data from various sources, and the Kudu Flume Sink lets us easily store
the aggregated Flume events into Kudu. Together they enable us to create a data warehouse out of
disparate sources.</p>

<p><em>Ara Abrahamian is a software engineer at Argyle Data building fraud detection systems using
sophisticated machine learning methods. Ara is the original author of the Flume Kudu Sink that
is included in the Kudu distribution. You can follow him on Twitter at
<a href="https://twitter.com/ara_e">@ara_e</a>.</em></p>

  </div>
</article>


  </div>
  <div class="col-lg-3 recent-posts">
    <h3>Recent posts</h3>
    <ul>
    
      <li> <a href="/2019/11/20/apache-kudu-1-11-1-release.html">Apache Kudu 1.11.1 released</a> </li>
    
      <li> <a href="/2019/11/20/apache-kudu-1-10-1-release.html">Apache Kudu 1.10.1 released</a> </li>
    
      <li> <a href="/2019/07/09/apache-kudu-1-10-0-release.html">Apache Kudu 1.10.0 Released</a> </li>
    
      <li> <a href="/2019/04/30/location-awareness.html">Location Awareness in Kudu</a> </li>
    
      <li> <a href="/2019/04/22/fine-grained-authorization-with-apache-kudu-and-impala.html">Fine-Grained Authorization with Apache Kudu and Impala</a> </li>
    
      <li> <a href="/2019/03/19/testing-apache-kudu-applications-on-the-jvm.html">Testing Apache Kudu Applications on the JVM</a> </li>
    
      <li> <a href="/2019/03/15/apache-kudu-1-9-0-release.html">Apache Kudu 1.9.0 Released</a> </li>
    
      <li> <a href="/2019/03/05/transparent-hierarchical-storage-management-with-apache-kudu-and-impala.html">Transparent Hierarchical Storage Management with Apache Kudu and Impala</a> </li>
    
      <li> <a href="/2018/12/11/call-for-posts.html">Call for Posts</a> </li>
    
      <li> <a href="/2018/10/26/apache-kudu-1-8-0-released.html">Apache Kudu 1.8.0 Released</a> </li>
    
      <li> <a href="/2018/09/26/index-skip-scan-optimization-in-kudu.html">Index Skip Scan Optimization in Kudu</a> </li>
    
      <li> <a href="/2018/09/11/simplified-pipelines-with-kudu.html">Simplified Data Pipelines with Kudu</a> </li>
    
      <li> <a href="/2018/08/06/getting-started-with-kudu-an-oreilly-title.html">Getting Started with Kudu - an O'Reilly Title</a> </li>
    
      <li> <a href="/2018/07/10/instrumentation-in-kudu.html">Instrumentation in Apache Kudu</a> </li>
    
      <li> <a href="/2018/03/23/apache-kudu-1-7-0-released.html">Apache Kudu 1.7.0 released</a> </li>
    
    </ul>
  </div>
</div>

      <footer class="footer">
        <div class="row">
          <div class="col-md-9">
            <p class="small">
            Copyright &copy; 2019 The Apache Software Foundation. 
            </p>
            <p class="small">
            Apache Kudu, Kudu, Apache, the Apache feather logo, and the Apache Kudu
            project logo are either registered trademarks or trademarks of The
            Apache Software Foundation in the United States and other countries.
            </p>
          </div>
          <div class="col-md-3">
            <a class="pull-right" href="https://www.apache.org/events/current-event.html">
                <img src="https://www.apache.org/events/current-event-234x60.png"/>
            </a>
          </div>
        </div>
      </footer>
    </div>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/1.11.3/jquery.min.js"></script>
    <script>
      // Try to detect touch-screen devices. Note: Many laptops have touch screens.
      $(document).ready(function() {
        if ("ontouchstart" in document.documentElement) {
          $(document.documentElement).addClass("touch");
        } else {
          $(document.documentElement).addClass("no-touch");
        }
      });
    </script>
    <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.6/js/bootstrap.min.js"
            integrity="sha384-0mSbJDEHialfmuBBQP6A4Qrprq5OVfW37PRR3j5ELqxss1yVqOtnepnHVP9aJ7xS"
            crossorigin="anonymous"></script>
    <script>
      (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
      })(window,document,'script','//www.google-analytics.com/analytics.js','ga');

      ga('create', 'UA-68448017-1', 'auto');
      ga('send', 'pageview');
    </script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/anchor-js/3.1.0/anchor.js"></script>
    <script>
      anchors.options = {
        placement: 'right',
        visible: 'touch',
      };
      anchors.add();
    </script>
  </body>
</html>

