blob: 981fb4dc9f99faa31af55abf98cd1e06528cb825 [file] [log] [blame]
% THIS IS AN EXAMPLE DOCUMENT FOR VLDB 2012
% based on ACM SIGPROC-SP.TEX VERSION 2.7
% Modified by Gerald Weber <gerald@cs.auckland.ac.nz>
% Removed the requirement to include *bbl file in here. (AhmetSacan, Sep2012)
% Fixed the equation on page 3 to prevent line overflow. (AhmetSacan, Sep2012)
\documentclass{vldb}
\usepackage{graphicx}
\usepackage{balance} % for \balance command ON LAST PAGE (only there!)
\usepackage{fontspec}
\setmainfont[Ligatures={TeX}]{Times}
\usepackage{hyperref}
\graphicspath{{figures/}}
\hyphenation{metamarkets nelson}
\begin{document}
% ****************** TITLE ****************************************
\title{Druid: Real-time Analytical Data Store}
% possible, but not really needed or used for PVLDB:
%\subtitle{[Extended Abstract]
%\titlenote{A full version of this paper is available as\textit{Author's Guide to Preparing ACM SIG Proceedings Using \LaTeX$2_\epsilon$\ and BibTeX} at \texttt{www.acm.org/eaddress.htm}}}
% ****************** AUTHORS **************************************
% You need the command \numberofauthors to handle the 'placement
% and alignment' of the authors beneath the title.
%
% For aesthetic reasons, we recommend 'three authors at a time'
% i.e. three 'name/affiliation blocks' be placed beneath the title.
%
% NOTE: You are NOT restricted in how many 'rows' of
% "name/affiliations" may appear. We just ask that you restrict
% the number of 'columns' to three.
%
% Because of the available 'opening page real-estate'
% we ask you to refrain from putting more than six authors
% (two rows with three columns) beneath the article title.
% More than six makes the first-page appear very cluttered indeed.
%
% Use the \alignauthor commands to handle the names
% and affiliations for an 'aesthetic maximum' of six authors.
% Add names, affiliations, addresses for
% the seventh etc. author(s) as the argument for the
% \additionalauthors command.
% These 'additional authors' will be output/set for you
% without further effort on your part as the last section in
% the body of your article BEFORE References or any Appendices.
\numberofauthors{7} % in this sample file, there are a *total*
% of EIGHT authors. SIX appear on the 'first-page' (for formatting
% reasons) and the remaining two appear in the \additionalauthors section.
\author{
% You can go ahead and credit any number of authors here,
% e.g. one 'row of three' or two rows (consisting of one row of three
% and a second row of one, two or three).
%
% The command \alignauthor (no curly braces needed) should
% precede each author name, affiliation/snail-mail address and
% e-mail address. Additionally, tag each line of
% affiliation/address with \affaddr, and tag the
% e-mail address with \email.
%
% 1st. author
\alignauthor Fangjin Yang\\
\affaddr{Metamarkets, Inc.}\\
\affaddr{625 2nd Street, Suite 230}\\
\affaddr{San Francisco, CA, USA}\\
\email{fangjin@metamarkets.com}
\alignauthor Eric Tschetter\\
\affaddr{Metamarkets, Inc.}\\
\affaddr{625 2nd Street, Suite 230}\\
\affaddr{San Francisco, CA, USA}\\
\email{eric@metamarkets.com}
\alignauthor Gian Merlino\\
\affaddr{Metamarkets, Inc.}\\
\affaddr{625 2nd Street, Suite 230}\\
\affaddr{San Francisco, CA, USA}\\
\email{gian@metamarkets.com}
\and
\alignauthor Nelson Ray\\
\affaddr{Metamarkets, Inc.}\\
\affaddr{625 2nd Street, Suite 230}\\
\affaddr{San Francisco, CA, USA}\\
\email{nelson@metamarkets.com}
\alignauthor Xavier Léauté\\
\affaddr{Metamarkets, Inc.}\\
\affaddr{625 2nd Street, Suite 230}\\
\affaddr{San Francisco, CA, USA}\\
\email{xavier@metamarkets.com}
\alignauthor Deep Ganguli\\
\affaddr{Metamarkets, Inc.}\\
\affaddr{625 2nd Street, Suite 230}\\
\affaddr{San Francisco, CA, USA}\\
\email{deep@metamarkets.com}
}
% There's nothing stopping you putting the seventh, eighth, etc.
% author on the opening page (as the 'third row') but we ask,
% for aesthetic reasons that you place these 'additional authors'
% in the \additional authors block, viz.
\additionalauthors{Michael Driscoll (Metamarkets, \texttt{mike@metamarkets.com})}
\date{21 March 2013}
% Just remember to make sure that the TOTAL number of authors
% is the number that will appear on the first page PLUS the
% number that will appear in the \additionalauthors section.
\maketitle
\begin{abstract}
Druid is a scalable, real-time analytical data store that supports
ad-hoc queries on large-scale data sets. The system combines a
columnar data layout, a shared-nothing architecture, and an advanced
indexing structure to allow arbitrary exploration of billion-row
tables with sub-second latencies. Druid scales horizontally and is the
core engine of the Metamarkets platform. In this paper, we detail the
architecture and implementation of Druid and describe how it solves
the real-time data ingestion problem.
\end{abstract}
\section{Introduction}
In recent years, enterprises are facing ever-growing collections of
data in all forms. The scale of data has reached terabytes, even
petabytes of information per day. Companies are increasingly realizing
the importance of unlocking the insights contained within this
data. Numerous database vendors such as IBM’s Netezza \cite{singh2011introduction}, Vertica
\cite{bear2012vertica}, and EMC’s Greenplum \cite{miner2012unified} offer data warehousing solutions, and
several research papers \cite{barroso2009datacenter,
chaudhuri1997overview, dewitt1992parallel} directly address this problem as
well. As the interactive data exploration space becomes more mature,
it is apparent that real-time ingestion and exploration of data will
unlock business-critical decisions for front office and back office
analysts.
Metamarkets realized early on that for high data volumes, existing
Relational Database Management Systems (RDBMS) and most NoSQL
architectures were not sufficient to address the performance and
use-case needs of the business intelligence space. Druid was built to
address a gap we believe exists in the current big-data ecosystem for
a real-time analytical data store. Druid is a distributed, columnar,
shared-nothing data store designed to reliably scale to petabytes of
data and thousands of cores. It is a highly available system designed
to run on commodity hardware with no downtime in the face of failures,
data imports or software updates. We have been building Druid over the
course of the last two years and it is the core engine of the
Metamarkets technology stack.
In many ways, Druid shares similarities with other interactive query systems
\cite{melnik2010dremel}, main-memory databases \cite{farber2012sap}, and widely-known distributed data
stores such as BigTable \cite{chang2008bigtable}, Dynamo \cite{decandia2007dynamo}, and Cassandra \cite{lakshman2010cassandra}. Unlike
most traditional data stores, Druid operates mainly on read-only data
and has limited functionality for writes. The system is highly optimized
for large-scale transactional data aggregation and arbitrarily deep data exploration. Druid is highly configurable
and allows users to adjust levels of fault tolerance and
performance.
Druid builds on the ideas of other distributed data stores, real-time
computation engines, and search engine indexing algorithms. In this
paper, we make the following contributions to academia:
\begin{itemize}
\item We outline Druid’s real-time ingestion and query capabilities
and explain how we can explore events within milliseconds of their
creation
\item We describe how the architecture allows for fast and flexible
queries and how inverted indices can be applied to quickly filter
data
\item We present experiments benchmarking Druid’s performance
\end{itemize}
The format of the paper is as follows: Section \ref{sec:data-model} describes the Druid
data model. Section \ref{sec:cluster} presents an overview of the components of a
Druid cluster. Section \ref{sec:query-api} outlines the query API. Section \ref{sec:storage} describes
data storage format in greater detail. Section \ref{sec:robustness} discusses Druid
robustness and failure responsiveness. Section \ref{sec:benchmarks} provides our
performance benchmarks. Section \ref{sec:related} lists the related work and Section \ref{sec:conclusions} presents
our conclusions.
\section{Data Model}
\label{sec:data-model}
The fundamental storage unit in Druid is the segment. Distribution and
replication in Druid are always done at a segment level. Segments
encapsulate a partition of a larger transactional data set. To better
understand how a typical row/column collection of data translates into
a Druid segment, consider the data set shown in Table~\ref{tab:sample_data}.
\begin{table*}
\centering
\caption{Sample Druid data}
\label{tab:sample_data}
\texttt{
\begin{tabular}{ l l l l l l l l l }
\hline
\textbf{Timestamp} & \textbf{Publisher} & \textbf{Advertiser} & \textbf{Gender} & \textbf{Country} & \textbf{Impressions} & \textbf{Clicks} & \textbf{Revenue} \\
2011-01-01T01:00:00Z & bieberfever.com & google.com & Male & USA & 1800 & 25 & 15.70 \\
2011-01-01T01:00:00Z & bieberfever.com & google.com & Male & USA & 2912 & 42 & 29.18 \\
2011-01-01T02:00:00Z & ultratrimfast.com & google.com & Male & USA & 1953 & 17 & 17.31 \\
2011-01-01T02:00:00Z & ultratrimfast.com & google.com & Male & USA & 3194 & 170 & 34.01 \\
\end{tabular}
}
\end{table*}
A segment is composed of multiple binary files, each representing a
column of a data set. The data set in Table~\ref{tab:sample_data} consists of 8 distinct
columns, one of which is the timestamp column. Druid always requires a
timestamp column because it (currently) only operates with event-based
data. Segments always represent some time interval and each column
file contains the specific values for that column over the time
interval. Since segments always contain data for a time range, it is
logical that Druid partitions data into smaller chunks based on the
timestamp value. In other words, segments can be thought of as blocks
of data that represent a certain granularity of time. For example, if
we wanted to shard the data in Table~\ref{tab:sample_data} to an hourly granularity, the
partitioning algorithm would result in two segments, one representing
each hour of 2011-01-01. Similarly, if we sharded the data to a daily
granularity, we would create a single segment for 2011-01-01.
Partitioning the data based on granularity buckets allows users to
fine tune the degree of parallelization they want in Druid. A data set
representing a year’s worth of data may be bucketed by day, and a data
set representing only a day’s worth of day may be partitioned by
hour. Sharding on a single dimension (time) may still result in segments that are too large to manage if the data volume is sufficiently high.
To create more operable partition chunks, Druid may
additionally shard data based on other factors such as dimension
cardinality. Each shard creates a segment and hence, segments are uniquely identified by a data source
id describing the data, the time interval of the data, a version
indicating when the segment was created, and a shard partition number.
Data in Druid can be conceptually thought of as being either real-time
or historical. Real-time data refers to recently ingested data;
typically, in a production setting, this will be data for the current
hour. Historical data refers to any data that is older. Segments can
be similarly classified as being either real-time or historical.
Historical segments are immutable and do not support insert, delete,
or update semantics. By maintaining immutable blocks of data within
the system, we can maintain a consistent snapshot of historical
data and provide read consistency without having to worry about
concurrent updates and deletes. If updates to a historical segment are
required, we build a new segment for the same data source and time
interval with the updated data. This new segment will have an updated version identifier.
Multiple segments for the same data source and time range
may exist in the system at any time. To provide read consistency,
Druid read operations always access data in a particular time range
from the segment with the latest version identifier for that time
range. Historical segments may be stored on local disk or in a
key-value “deep” store such as S3 \cite{decandia2007dynamo} or HDFS \cite{shvachko2010hadoop}. All historical
segments have associated metadata describing properties of the segment
such as size in bytes, compression format, and location in deep
storage.
Real-time segments are mutable and generally represent a much shorter
duration of time than historical segments. Real-time segments contain
recent data and are incrementally populated as new events are
ingested. On a periodic basis, real-time segments are converted into
historical segments. Additional details about this conversion process
are given in Section~\ref{sec:realtime}.
\section{Cluster}
\label{sec:cluster}
A Druid cluster consists of different types of nodes, each performing
a specific function. The composition of a Druid cluster is shown in
Figure~\ref{fig:druid_cluster}.
\begin{figure*}
\centering
\includegraphics[width = 6in]{druid_cluster}
\caption{An overview of a Druid cluster.}
\label{fig:druid_cluster}
\end{figure*}
Recall that data in Druid is classified as either real-time or
historical. The Druid cluster is architected to reflect this
conceptual separation of data. Real-time nodes are responsible for
ingesting, storing, and responding to queries for the most recent
events. Similarly, historical compute nodes are responsible for
loading and responding to queries for historical events.
Data in Druid is stored on storage nodes. Storage nodes can be either
compute or real-time nodes. Queries to access this data will
typically first hit a layer of broker nodes. Broker nodes are
responsible for finding and routing queries down to the storage nodes
that host the pertinent data. The storage nodes compute their portion
of the query response in parallel and return their results to the
brokers. Broker nodes, compute nodes, and realtime nodes can all be
classified as queryable nodes.
Druid also has a set of coordination nodes to manage load assignment,
distribution, and replication. Coordination nodes are not queryable
and instead focus on maintaining cluster stability. Coordination nodes
have an external dependency on a MySQL database.
Druid relies on an Apache Zookeeper \cite{hunt2010zookeeper} cluster
for coordination. This dependency is required because there is no
direct coordination-related communication between Druid nodes. The
following sections will discuss each Druid component in greater
detail.
\subsection{Apache Zookeeper}
Zookeeper is a service for coordinating processes of distributed
applications. Zookeeper provides connecting applications an
abstraction of a hierarchy of data nodes known as znodes. Each znode
is part of a hierarchical namespace, similar to file
systems. Zookeeper has the concept of ephemeral and permanent
znodes. Permanent nodes must be created and destroyed explicitly by a
connecting application. Ephemeral znodes can be created by connecting
applications and deleted either explicitly or if the session that
created the znode is terminated (such as in the event of service
failure).
\subsection{Historical Compute Nodes}
Historical compute nodes are the main workers of a Druid cluster and
are self-contained and self-sufficient. Compute nodes load historical
segments from permanent/deep storage and expose them for
querying. There is no single point of contention between the nodes and
nodes have no knowledge of one another. Compute nodes are
operationally simple; they only know how to perform the tasks they are
assigned. To help other services discover compute nodes and the data
they hold, every compute node maintains a constant Zookeeper
connection. Compute nodes announce their online state and the segments
they serve by creating ephemeral nodes under specifically configured
Zookeeper paths. Instructions for a given compute node to load new
segments or drop existing segments are sent by creating ephemeral
znodes under a special “load queue” path associated with the compute
node.
To expose a segment for querying, a compute node must first possess a
local copy of the segment. Before a compute node downloads a segment
from deep storage, it first checks a local disk directory (cache) to
see if the segment already exists in local storage. If no cache
information about the segment is present, the compute node will
download metadata about the segment from Zookeeper. This metadata
includes information about where the segment is located in deep
storage and about how to decompress and process the segment. Once a
compute node completes processing a segment, the node announces (in
Zookeeper) that it is serving the segment. At this point, the segment
is queryable.
\subsubsection{Tiers}
\label{sec:tiers}
Compute nodes can be grouped in different tiers, where all nodes in a
given tier are identically configured. Different performance and
fault-tolerance parameters can be set for each tier. The purpose of
tiered nodes is to enable higher or lower priority segments to be
distributed according to their importance. For example, it is possible
to spin up a “hot” tier of compute nodes that have a high number of
cores and a large RAM capacity. The “hot” cluster can be configured to
download more frequently accessed segments. A parallel “cold” cluster
can also be created with much less powerful backing hardware. The
“cold” cluster would only contain less frequently accessed segments.
\subsection{Real-time Nodes}
\label{sec:realtime}
Real-time nodes encapsulate the functionality to ingest and query
real-time data streams. Data indexed via these nodes is immediately
available for querying. Real-time nodes are a consumer of data and
require a corresponding producer to provide the data
stream. Typically, for data durability purposes, a message bus such as
Kafka \cite{kreps2011kafka} sits between the producer and the real-time node as shown
in Figure~\ref{fig:data-ingestion}.
\begin{figure}
\centering
\includegraphics[width = 3in]{druid_message_bus}
\caption{Real-time data ingestion.}
\label{fig:data-ingestion}
\end{figure}
The purpose of the message bus in Figure~\ref{fig:data-ingestion} is to act as a buffer for
incoming events. The message bus can maintain offsets indicating the
position in an event stream that a real-time node has read up to and
real-time nodes can update these offsets periodically. The message bus also acts as a backup storage for recent events.
Real-time nodes ingest data by reading events from the message bus. The time from event creation to message bus storage to
event consumption is on the order of hundreds of milliseconds.
Real-time nodes maintain an in-memory index for all incoming
events. These indexes are incrementally populated as new events appear on the message bus. The indexes are also directly queryable.
Real-time nodes persist their indexes to disk either periodically or after some maximum row limit is
reached. After each persist, a real-time node updates the message bus
with the offset of the last event of the most recently persisted
index. Each persisted index is immutable. If a real-time node fails and recovers, it can simply reload
any indexes that were persisted to disk and continue reading the
message bus from the point the last offset was committed. Periodically committing offsets reduces the number of messages a real-time
node has to rescan after a failure scenario.
Real-time nodes maintain a consolidated view of the currently updating
index and of all indexes persisted to disk. This unified view allows
all indexes on a node to be queried. On a periodic basis, the nodes will
schedule a background task that searches for all persisted indexes of
a data source. The task merges these indexes together and builds a
historical segment. The nodes will upload the segment to deep storage
and provide a signal for the historical compute nodes to begin serving
the segment. The ingest, persist, merge, and handoff steps are fluid;
there is no data loss as a real-time node converts a real-time segment
to a historical one. Figure~\ref{fig:data-durability} illustrates the process.
\begin{figure}
\centering
\includegraphics[width = 3in]{druid_realtime_flow}
\caption{Real-time data durability}
\label{fig:data-durability}
\end{figure}
Similar to compute nodes, real-time nodes announce segments in
Zookeeper. Unlike historical segments, real-time segments may
represent a period of time that extends into the future. For example,
a real-time node may announce it is serving a segment that contains
data for the current hour. Before the end of the hour, the real-time
node continues to collect data for the hour. Every 10 minutes (the
persist period is configurable), the node will flush and persist its
in-memory index to disk. At the end of the current hour, the real-time
node prepares to serve data for the next hour by creating a new index
and announcing a new segment for the next hour. The node does not
immediately merge and build a historical segment for the previous hour
until after some window period has passed. Having a window period
allows for straggling data points to come in and minimizes the risk of
data loss. At the end of the window period, the real-time node will
merge all persisted indexes, build a historical segment for the
previous hour, and hand the segment off to historical nodes to
serve. Once the segment is queryable on the historical nodes, the
real-time node flushes all information about the segment and
unannounces it is serving the segment.
Real-time nodes are highly scalable. If the data volume and ingestion
rates for a given data source exceed the maximum capabilities of a
single node, additional nodes can be added. Multiple nodes can
consume events from the same stream, and every individual node only
holds a portion of the total number of events. This creates natural
partitions across nodes. Each node announces the real-time segment it
is serving and each real-time segment has a partition number. Data
from individual nodes will be merged at the Broker level. To our
knowledge, the largest production level real-time Druid cluster is
consuming approximately 2 TB of raw data per hour.
\subsection{Broker Nodes}
Broker nodes act as query routers to other queryable nodes such as
compute and real-time nodes. Broker nodes understand the metadata
published in Zookeeper about what segments exist and on what nodes the
segments are stored. Broker nodes route incoming queries such that the queries hit
the right storage nodes. Broker nodes also merge partial results from
storage nodes before returning a final consolidated result to the
caller. Additionally, brokers provide an extra level of data
durability as they maintain a cache of recent results. In the event
that multiple storage nodes fail and all copies of a segment are
somehow lost, it is still possible that segment results can still be
returned if that information exists in the cache.
\subsubsection{Timeline}
To determine the correct nodes to forward queries to, Broker nodes
first build a view of the world from information in Zookeeper. Recall
that Druid uses Zookeeper to maintain information about all compute
and real-time nodes in a cluster and the segments those nodes are
serving. For every data source in Zookeeper, the Broker node builds a
timeline of segments for the data source and the nodes that serve them. A timeline
consists of segments and represents which segments contain data for
what ranges of time. Druid may have multiple segments where the data
source and interval are the same but versions differ. The timeline
view will always surface segments with the most recent version
identifier for a time range. If two segments intervals overlap, the segment with the more recent
version always has precedence. When queries are received for a specific
data source and interval, the Broker node performs a lookup on the
timeline associated with the query data source for the query interval
and retrieves the segments that contain data for the query. The broker
node maps these segments to the storage nodes that serve them and
forwards the query down to the respective nodes.
\subsubsection{Caching}
\label{sec:caching}
Broker nodes employ a distributed cache with a LRU \cite{o1993lru,
kim2001lrfu} cache invalidation strategy. The broker cache stores
per segment results. The cache can be local to each broker node or
shared across multiple nodes using an external distributed cache
such as memcached \cite{fitzpatrick2004distributed}. Recall that each time a broker node receives a
query, it first maps the query to a set of segments. A subset of
these segment results may already exist in the cache and the results
can be directly pulled from the cache. For any segment results that
do not exist in the cache, the broker node will forward the query
to the compute nodes. Once the compute nodes return their results,
the broker will store those results in the cache. Real-time segments
are never cached and hence requests for real-time data will always
be forwarded to real-time nodes. Real-time data is perpetually
changing and caching the results would be unreliable.
\subsection{Coordination (Master) Nodes}
The Druid coordination or master nodes are primarily in charge of
segment management and distribution. The Druid master is responsible
for loading new segments, dropping outdated segments, managing segment
replication, and balancing segment load. Druid uses a multi-version
concurrency control swapping protocol for managing segments in order
to maintain stable views.
The Druid master runs periodically to determine the current state of
the cluster. It makes decisions by comparing the expected state of the
cluster with the actual state of the cluster at the time of the
run. As with all Druid nodes, the Druid master maintains a connection
to Zookeeper for current cluster information. The master also
maintains a connection to a MySQL database that contains additional
operational parameters and configurations. One of the key pieces of
information located in the MySQL database is a segment table that
contains a list of historical segments that should be served. This
table can be updated by any service that creates historical
segments. The MySQL database also contains a rule table that governs
how segments are created, destroyed, and replicated in the cluster.
The master does not directly communicate with a compute node when
assigning it work; instead the master creates an ephemeral znode in
Zookeeper containing information about what the compute node should
do. The compute node maintains a similar connection to Zookeeper to
monitor for new work.
\subsubsection{Rules}
Historical segments are loaded and dropped from the cluster based on a
set of rules. Rules indicate how segments should be assigned to
different compute node tiers and how many replicates of a segment
should exist in each tier. Rules may also indicate when segments
should be dropped entirely from the cluster. The master loads a set of
rules from a rule table in the MySQL database. Rules may be specific
to a certain data source and/or a default set of rules can be
configured. The master will cycle through all available segments and
match each segment with the first rule that applies to it.
\subsubsection{Load Balancing}
In a typical production environment, queries often hit dozens or even
hundreds of data segments. Since each compute node has limited
resources, historical segments must be distributed among the cluster
to ensure that the cluster load is not too imbalanced. Determining
optimal load distribution requires some knowledge about query patterns
and speeds. Typically, queries cover recent data spanning contiguous
time intervals for a single data source. On average, queries that
access smaller segments are faster.
These query patterns suggest replicating recent historical segments at
a higher rate, spreading out large segments that are close in time to
different compute nodes, and co-locating segments from different data
sources. To optimally distribute and balance segments among the
cluster, we developed a cost-based optimization procedure that takes
into account the segment data source, recency, and size. The exact
details of the algorithm are beyond the scope of this paper.
\section{Query API}
\label{sec:query-api}
Queries to Druid are made in the form of POST requests. All queryable
Druid nodes share the same query API. The body of the POST request is
a JSON object containing key-value pairs specifying various query
parameters. A typical query will contain the data source name, the
granularity of the result data, the time range of interest, and the
type of request. A sample time series query is shown below:
\begin{verbatim}
{
"queryType" : "timeseries",
"dataSource" : "sample_data",
"intervals" : "2013-01-01/2013-01-02",
"filter" : {
"type" : "selector",
"dimension" : "poets",
"value" : "Ke$ha"
},
"granularity" : "day",
"aggregations" : [
{
"type" : "count",
"fieldName" : "row",
"name" : "row"
}
]
}
\end{verbatim}
Certain query types will also support a filter set. A filter set is an
arbitrary Boolean expression of dimension name and value
pairs. Support for complex nested filter sets enables flexibility and
provides the ability to deeply explore data.
The exact query syntax depends on the query type and the information requested.
It is beyond the scope of this paper to describe the Query API in full detail.
We are also in the process of building out SQL support for Druid.
\section{Storage}
\label{sec:storage}
Druid is a column-oriented data store. When considering aggregates
over a large number of events, the advantages storing data as columns
rather than rows are well documented \cite{cattell2011scalable}. Column storage allows for
more efficient CPU usage as only what is needed is actually loaded and
scanned. In a row oriented data store, all columns associated with a
row must be scanned as part of an aggregation. The additional scan
time can introduce performance degradations as high as 250\% \cite{bear2012vertica}.
\subsection{Column Types}
Druid has multiple column types to represent the various column value
formats. Depending on the column type, different compression methods
are used to reduce the cost of storing a column in memory and on
disk. In the example given in Table~\ref{tab:sample_data}, the
publisher, advertiser, gender, and country columns only contain
strings. String columns can be dictionary encoded. Dictionary encoding
is a common method to compress data and has been used in other data
stores such as Powerdrill \cite{hall2012processing}. In the example in
Table~\ref{tab:sample_data}, we can map each publisher to an unique
integer identifier.
\begin{verbatim}
bieberfever.com -> 0
ultratrimfast.com -> 1
\end{verbatim}
This mapping allows us to represent the publisher column as an integer
array where the array indices correspond to the rows of the original
data set. For the publisher column, we can represent the unique
publishers as follows:
\begin{verbatim}
[0, 0, 1, 1]
\end{verbatim}
The resulting integer array lends itself very well to
compression. Generic compression algorithms on top of encodings are
very common in column-stores. We opted to use the LZF \cite{liblzf2013} compression
algorithm.
We can leverage similar compression algorithms for numeric
columns. For example, the clicks and revenue columns in
Table~\ref{tab:sample_data} can also be expressed as individual
arrays.
\begin{verbatim}
Clicks -> [25, 42, 17, 170]
Revenue -> [15.70, 29.18, 17.31, 34.01]
\end{verbatim}
In this case we compress the raw values as opposed to their dictionary
representations.
\subsection{Filters}
To support arbitrary filter sets, Druid creates additional lookup
indices for string columns. These lookup indices are compressed and
Druid operates over the indices in their compressed form. Filters can
be expressed as Boolean equations of multiple lookup indices. Boolean
operations of indices in their compressed form is both performance and
space efficient.
Let us consider the publisher column in
Table~\ref{tab:sample_data}. For each unique publisher in
Table~\ref{tab:sample_data}, we can form some representation
indicating which table rows a particular publisher is seen. We can
store this information in a binary array where the array indices
represent our rows. If a particular publisher is seen in a certain
row, that array index is marked as \texttt{1}. For example:
\begin{verbatim}
bieberfever.com -> rows [0, 1] -> [1][1][0][0]
ultratrimfast.com -> rows [2, 3] -> [0][0][1][1]
\end{verbatim}
\texttt{\href{http://bieberfever.com}{bieberfever.com}} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values
to row indices forms an inverted index \cite{tomasic1993performance}. If we want to know which
rows contain {\ttfamily bieberfever.com} or {\ttfamily ultratrimfast.com}, we can \texttt{OR} together
the \texttt{bieberfever.com} and \texttt{ultratrimfast.com} arrays.
\begin{verbatim}
[0][1][0][1] OR [1][0][1][0] = [1][1][1][1]
\end{verbatim}
\begin{figure}
\centering
\includegraphics[width = 3in]{concise_plot}
\caption{Integer array size versus CONCISE set size.}
\label{fig:concise_plot}
\end{figure}
This approach of performing Boolean operations on large bitmap sets is
commonly used in search engines. Bitmap compression algorithms are a
well-defined area of research and often utilize run-length
encoding. Well known algorithms include Byte-aligned Bitmap Code \cite{antoshenkov1995byte},
Word-Aligned Hybrid (WAH) code \cite{wu2006optimizing}, and Partitioned Word-Aligned
Hybrid (PWAH) compression \cite{van2011memory}. Druid opted to use the CONCISE
algorithm \cite{colantonio2010concise} as it can outperform WAH by reducing the size of the
compressed bitmaps by up to 50\%. Figure~\ref{fig:concise_plot} illustrates the number of
bytes using CONCISE compression versus using an integer array. The
results were generated on a cc2.8xlarge system with a single thread,
2G heap, 512m young gen, and a forced GC between each run. The data
set is a single day’s worth of data collected from the Twitter garden
hose \cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12
dimensions of varying cardinality. As an additional comparison, we
also resorted the data set rows to maximize compression.
In the unsorted case, the total CONCISE compressed size was 53,451,144
bytes and the total integer array size was 127,248,520 bytes. Overall,
CONCISE compressed sets are about 42\% smaller than integer arrays.
In the sorted case, the total CONCISE compressed size was 43,832,884
bytes and the total integer array size was 127,248,520 bytes. What is
interesting to note is that after sorting, global compression only
increased minimally. The total CONCISE set size to total integer array
size is 34\%. It is also interesting to note that as the cardinality
of a dimension approaches the total number of rows in a data set,
integer arrays actually require less space than CONCISE sets.
\subsection{Storage Engine}
Druid’s persistence components allows for different storage engines to
be plugged in, similar to Dynamo \cite{decandia2007dynamo}. These storage engines may store
data in in-memory structures such as the JVM heap or in memory mapped
structures. The ability to swap storage engines allows for Druid to be
configured depending on a particular application’s specifications. An
in-memory storage engine may be operationally more expensive than a
memory-mapped storage engine but could be a better alternative if
performance is critical. At Metamarkets, we commonly use a
memory-mapped storage engine.
\section{Robustness and Fault-Tolerance}
\label{sec:robustness}
To achieve high system availability and data durability, Druid employs
several fault recovery techniques. Druid has no single point of
failure.
\subsection{Replication}
Druid replicates historical segments on multiple hosts. The number of
replicates in each tier of the historical compute cluster is fully
configurable. Setups that require high levels of fault tolerance can
be configured to have a high number of replicates. Replicates are
assigned to compute nodes by coordination nodes using the same load
distribution algorithm discussed in Section~\ref{sec:caching}. Conceptually,
broker nodes do not distinguish historical segments from their
replicates. Broker nodes forward queries to the first node it finds
that contains data for the query.
Real-time segments follow a different replication model as real-time
segments are mutable. Recall that real-time nodes act as consumers of
a data stream. Multiple real-time nodes can read from the same message
bus if each maintains a unique offset, hence creating multiple copies
of a real-time segment. If a real-time node fails and recovers, it can
reload any indexes that were persisted to disk and read from the
message bus from the point it last committed an offset.
\subsection{Local Segment Cache}
Recall that each Druid compute node maintains a local cache of
historical segments it recently served. A compute node also has a
lookup table for segments it has in its cache and stores this lookup
table on disk. When a compute node is assigned a new segment to load,
the compute node will first check its local segment cache directory to
see if the segment had been previously downloaded. If a cache entry
exists, the compute node will directly read the segment binary files
and load the segment.
The segment cache is also leveraged when a compute node is initially
started. During startup, a compute node will first read its lookup
table to determine what segments it has cached. All cached segments
are immediately loaded and served. This feature introduces minimal
overhead and allows a compute node to readily serve data as soon as it
comes online. By making data quickly available on startup and
minimizing initial startup time, compute nodes that become
inexplicably disconnected from the cluster can reconnect themselves
seamlessly. This also means that software updates can be pushed to
compute nodes without disruption to cluster operations. In practice, a
software update to a compute node can be completed before coordination
nodes even notice that the compute node has disconnected. At
Metamarkets, we update Druid through rolling restarts. Compute nodes
are updated one at a time and we experience no downtime or data loss
through the update process.
\subsection{Failure Detection}
If a compute node completely fails and becomes unavailable, the
ephemeral Zookeeper znodes it created are deleted. The master node
will notice that certain segments are insufficiently replicated or
missing altogether. Additional replicates will be created and
redistributed throughout the cluster.
We are moving towards building out infrastructure to support
programmatic creation of real-time nodes. In the near future, the
master node will also notice if real-time segments are insufficiently
replicated and automatically create additional real-time nodes as
redundant backups.
\subsection{Adding and Removing Nodes}
Starting and removing Druid nodes is a relatively simple process; all
that is required is to start and stop Java processes. There is minimal
operational overhead with adding nodes in batches. Scaling down the
cluster is usually done one node at a time with some time lapse
between shutdowns. This allows the master to have ample time to
redistribute load and create additional replicates. Shutting down
nodes in batches is generally not supported as it may destroy all
copies of a segment, which would lead to data loss.
\section{Performance Benchmarks}
\label{sec:benchmarks}
To benchmark Druid performance, we created a large test cluster with
6TB of uncompressed data, representing tens of billions of fact
rows. The data set contained more than a dozen dimensions, with
cardinalities ranging from the double digits to tens of millions. We computed
four metrics for each row (counts, sums, and averages). The data was
sharded first on timestamp then on dimension values, creating
thousands of shards roughly 8 million fact rows apiece.
The cluster used in the benchmark consisted of 100 historical compute
nodes, each with 16 cores, 60GB of RAM, 10 GigE Ethernet, and 1TB of
disk space. Collectively, the cluster comprised of 1600 cores, 6TB or
RAM, fast Ethernet and more than enough disk space.
SQL statements are included in Table~\ref{tab:sql_queries} to describe the
purpose of each of the queries. Please note:
\begin{itemize}
\item The timestamp range of the queries encompassed all data.
\item Each machine was a 16-core machine with 60GB RAM and 1TB of local
disk. The machine was configured to only use 15 threads for
processing queries.
\item A memory-mapped storage engine was used (the machine was configured to memory map the data
instead of loading it into the Java heap.)
\end{itemize}
\begin{figure}
\centering
\includegraphics[width = 3in]{cluster_scan_rate}
\caption{Druid cluster scan rate with lines indicating linear scaling
from 25 nodes.}
\label{fig:cluster_scan_rate}
\end{figure}
\begin{figure}
\centering
\includegraphics[width = 3in]{core_scan_rate}
\caption{Druid core scan rate.}
\label{fig:core_scan_rate}
\end{figure}
\begin{table*}
\centering
\caption{Druid Queries}
\label{tab:sql_queries}
\begin{verbatim}
1 SELECT count(*) FROM _table_ WHERE timestamp >= ? AND timestamp < ?
2 SELECT count(*), sum(metric1) FROM _table_ WHERE timestamp >= ? AND timestamp < ?
3 SELECT count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM _table_
WHERE timestamp >= ? AND timestamp < ?
4 SELECT high_card_dimension, count(*) AS cnt FROM _table_
WHERE timestamp >= ? AND timestamp < ?
GROUP BY high_card_dimension ORDER BY cnt limit 100
5 SELECT high_card_dimension, count(*) AS cnt, sum(metric1) FROM _table_
WHERE timestamp >= ? AND timestamp < ?
GROUP BY high_card_dimension ORDER BY cnt limit 100
6 SELECT high_card_dimension, count(*) AS cnt, sum(metric1), sum(metric2), sum(metric3), sum(metric4)
FROM _table_ WHERE timestamp >= ? AND timestamp < ?
GROUP BY high_card_dimension ORDER BY cnt limit 100
\end{verbatim}
\end{table*}
%\begin{enumerate}
%\item \texttt{Select high\_card\_dimension, count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) AS cnt from \_table\_ where timestamp $\geq$ ? and timestamp $<$ ? group by high\_card\_dimension order by cnt limit 100;}
%\end{enumerate}
Figure~\ref{fig:cluster_scan_rate} shows the cluster scan rate and
Figure~\ref{fig:core_scan_rate} shows the core scan rate. In
Figure~\ref{fig:cluster_scan_rate} we also include projected linear
scaling based on the results of the 25 core cluster. In particular,
we observe diminishing marginal returns to performance in the size of
the cluster. Under linear scaling, SQL query 1 would have achieved a
speed of 37 billion rows per second on our 75 node cluster. In fact,
the speed was 26 billion rows per second. The speed up of a parallel
computing system is often limited by the time needed for the
sequential operations of the system, in accordance with Amdahl's law
\cite{amdahl1967validity}.
The first query listed in Table~\ref{tab:sql_queries} is a simple
count, achieving scan rates of 33M rows/second/core. We believe
the 75 node cluster was actually overprovisioned for the test
dataset, explaining the modest improvement over the 50 node cluster.
Druid's concurrency model is based on shards: one thread will scan one
shard. If the number of segments on a compute node modulo the number
of cores is small (e.g. 17 segments and 15 cores), then many of the
cores will be idle during the last round of the computation.
When we include more aggregations we see performance degrade. This is
because of the column-oriented storage format Druid employs. For the
\texttt{count(*)} queries, Druid only has to check the timestamp column to satisfy
the ``where'' clause. As we add metrics, it has to also load those metric
values and scan over them, increasing the amount of memory scanned.
\section{Related Work}
\label{sec:related}
Cattell \cite{cattell2011scalable} maintains a great summary about existing Scalable SQL and
NoSQL data stores. In the landscape of distributed data stores, Druid
feature-wise sits somewhere between Google’s Dremel \cite{melnik2010dremel} and Powerdrill
\cite{hall2012processing}. Druid has most of the features implemented in Dremel (Dremel
handles arbitrary nested data structures while Druid only allows for a
single level of array-based nesting) and many of the interesting
compression algorithms mentioned in Powerdrill.
Although Druid builds on many of the same principles as other
distributed column-oriented data stores \cite{fink2012distributed}, most existing data
stores are designed to be key-value stores \cite{lerner2010redis}, or
document/extensible record stores \cite{stonebraker2005c}. Such data stores are great
solutions for traditional data warehouse needs and general
back-office/reporting usage. Typically, analysts will query these data
stores and build reports from the results. In-memory databases such as
SAP’s HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb} are examples of other data stores that
are highly suited for traditional data warehousing needs. Druid is a
front-office system designed such that user-facing dashboards can be
built on top of it. Similar to \cite{paraccel2013}, Druid has analytical features
built in. The main features Druid offers over traditional data
warehousing solutions are real-time data ingestion, interactive
queries and interactive query latencies. In terms of real-time
ingestion and processing of data, Trident/Storm \cite{marz2013storm} and Streaming
Spark \cite{zaharia2012discretized} are other popular real-time computation systems, although
they lack the data storage capabilities of Druid. Spark/Shark \cite{engle2012shark} are
also doing similar work in the area of fast data analysis on large
scale data sets. Cloudera Impala \cite{cloudera2013} is another system focused on
optimizing querying performance, but more so in Hadoop environments.
Druid leverages a unique combination of algorithms in its
architecture. Although we believe no other data store has the same set
of functionality as Druid, some of Druid’s features such as using
inverted indices to perform faster filters also exist in other data
stores \cite{macnicol2004sybase}.
\section{Conclusions}
\label{sec:conclusions}
In this paper, we presented Druid, a distributed, column-oriented,
real-time analytical data store. Druid is a highly customizable
solution that is optimized for fast query latencies. Druid ingests
data in real-time and is fault-tolerant. We discussed the performance
of Druid on billion row data sets. We summarized key Druid architecture
aspects such as the storage format, query language and general
execution. In the future, we plan to cover more in depth the different
algorithms we’ve developed for Druid and how other systems may plug
into Druid to achieve powerful use cases.
\section{Acknowledgements}
\label{sec:acknowledgements}
We want to thank Steve Harris and Jaypal Sethi for their feedback on improving this paper.
We also want to give recognition to Adam Smith; without him the first Metamarkets hackathon would not have
been organized and this paper would not have been created. Another special recognition to Katherine Chu for
helping to create all the images in this paper.
Druid could not have been built without the help of many great
engineers at Metamarkets and in the community. We want to thank Danny Yuan, Jae Hyeon Bae, Paul Baclace, Dave
Nielsen, and Dhruv Parthasarathy for their
contributions to Druid.
% The following two commands are all you need in the
% initial runs of your .tex file to
% produce the bibliography for the citations in your paper.
\bibliographystyle{abbrv}
\bibliography{druid} % druid.bib is the name of the Bibliography in this case
% You must have a proper ".bib" file
% and remember to run:
% latex bibtex latex latex
% to resolve all references
%Generated by bibtex from your ~.bib file. Run latex,
%then bibtex, then latex twice (to resolve references).
%APPENDIX is optional.
% ****************** APPENDIX **************************************
% Example of an appendix; typically would start on a new page
%pagebreak
\end{document}