| % THIS IS AN EXAMPLE DOCUMENT FOR VLDB 2012 |
| % based on ACM SIGPROC-SP.TEX VERSION 2.7 |
| % Modified by Gerald Weber <gerald@cs.auckland.ac.nz> |
| % Removed the requirement to include *bbl file in here. (AhmetSacan, Sep2012) |
| % Fixed the equation on page 3 to prevent line overflow. (AhmetSacan, Sep2012) |
| |
| \documentclass{vldb} |
| \usepackage{graphicx} |
| \usepackage{balance} % for \balance command ON LAST PAGE (only there!) |
| \usepackage{fontspec} |
| \setmainfont[Ligatures={TeX}]{Times} |
| \usepackage{hyperref} |
| \graphicspath{{figures/}} |
| |
| \hyphenation{metamarkets nelson} |
| |
| \begin{document} |
| |
| % ****************** TITLE **************************************** |
| |
| \title{Druid: Real-time Analytical Data Store} |
| |
| % possible, but not really needed or used for PVLDB: |
| %\subtitle{[Extended Abstract] |
| %\titlenote{A full version of this paper is available as\textit{Author's Guide to Preparing ACM SIG Proceedings Using \LaTeX$2_\epsilon$\ and BibTeX} at \texttt{www.acm.org/eaddress.htm}}} |
| |
| % ****************** AUTHORS ************************************** |
| |
| % You need the command \numberofauthors to handle the 'placement |
| % and alignment' of the authors beneath the title. |
| % |
| % For aesthetic reasons, we recommend 'three authors at a time' |
| % i.e. three 'name/affiliation blocks' be placed beneath the title. |
| % |
| % NOTE: You are NOT restricted in how many 'rows' of |
| % "name/affiliations" may appear. We just ask that you restrict |
| % the number of 'columns' to three. |
| % |
| % Because of the available 'opening page real-estate' |
| % we ask you to refrain from putting more than six authors |
| % (two rows with three columns) beneath the article title. |
| % More than six makes the first-page appear very cluttered indeed. |
| % |
| % Use the \alignauthor commands to handle the names |
| % and affiliations for an 'aesthetic maximum' of six authors. |
| % Add names, affiliations, addresses for |
| % the seventh etc. author(s) as the argument for the |
| % \additionalauthors command. |
| % These 'additional authors' will be output/set for you |
| % without further effort on your part as the last section in |
| % the body of your article BEFORE References or any Appendices. |
| |
| \numberofauthors{7} % in this sample file, there are a *total* |
| % of EIGHT authors. SIX appear on the 'first-page' (for formatting |
| % reasons) and the remaining two appear in the \additionalauthors section. |
| |
| \author{ |
| % You can go ahead and credit any number of authors here, |
| % e.g. one 'row of three' or two rows (consisting of one row of three |
| % and a second row of one, two or three). |
| % |
| % The command \alignauthor (no curly braces needed) should |
| % precede each author name, affiliation/snail-mail address and |
| % e-mail address. Additionally, tag each line of |
| % affiliation/address with \affaddr, and tag the |
| % e-mail address with \email. |
| % |
| % 1st. author |
| \alignauthor Fangjin Yang\\ |
| \affaddr{Metamarkets, Inc.}\\ |
| \affaddr{625 2nd Street, Suite 230}\\ |
| \affaddr{San Francisco, CA, USA}\\ |
| \email{fangjin@metamarkets.com} |
| \alignauthor Eric Tschetter\\ |
| \affaddr{Metamarkets, Inc.}\\ |
| \affaddr{625 2nd Street, Suite 230}\\ |
| \affaddr{San Francisco, CA, USA}\\ |
| \email{eric@metamarkets.com} |
| \alignauthor Gian Merlino\\ |
| \affaddr{Metamarkets, Inc.}\\ |
| \affaddr{625 2nd Street, Suite 230}\\ |
| \affaddr{San Francisco, CA, USA}\\ |
| \email{gian@metamarkets.com} |
| \and |
| \alignauthor Nelson Ray\\ |
| \affaddr{Metamarkets, Inc.}\\ |
| \affaddr{625 2nd Street, Suite 230}\\ |
| \affaddr{San Francisco, CA, USA}\\ |
| \email{nelson@metamarkets.com} |
| \alignauthor Xavier Léauté\\ |
| \affaddr{Metamarkets, Inc.}\\ |
| \affaddr{625 2nd Street, Suite 230}\\ |
| \affaddr{San Francisco, CA, USA}\\ |
| \email{xavier@metamarkets.com} |
| \alignauthor Deep Ganguli\\ |
| \affaddr{Metamarkets, Inc.}\\ |
| \affaddr{625 2nd Street, Suite 230}\\ |
| \affaddr{San Francisco, CA, USA}\\ |
| \email{deep@metamarkets.com} |
| } |
| % There's nothing stopping you putting the seventh, eighth, etc. |
| % author on the opening page (as the 'third row') but we ask, |
| % for aesthetic reasons that you place these 'additional authors' |
| % in the \additional authors block, viz. |
| \additionalauthors{Michael Driscoll (Metamarkets, \texttt{mike@metamarkets.com})} |
| \date{21 March 2013} |
| % Just remember to make sure that the TOTAL number of authors |
| % is the number that will appear on the first page PLUS the |
| % number that will appear in the \additionalauthors section. |
| |
| \maketitle |
| |
| \begin{abstract} |
| Druid is a scalable, real-time analytical data store that supports |
| ad-hoc queries on large-scale data sets. The system combines a |
| columnar data layout, a shared-nothing architecture, and an advanced |
| indexing structure to allow arbitrary exploration of billion-row |
| tables with sub-second latencies. Druid scales horizontally and is the |
| core engine of the Metamarkets platform. In this paper, we detail the |
| architecture and implementation of Druid and describe how it solves |
| the real-time data ingestion problem. |
| \end{abstract} |
| |
| \section{Introduction} |
| In recent years, enterprises are facing ever-growing collections of |
| data in all forms. The scale of data has reached terabytes, even |
| petabytes of information per day. Companies are increasingly realizing |
| the importance of unlocking the insights contained within this |
| data. Numerous database vendors such as IBM’s Netezza \cite{singh2011introduction}, Vertica |
| \cite{bear2012vertica}, and EMC’s Greenplum \cite{miner2012unified} offer data warehousing solutions, and |
| several research papers \cite{barroso2009datacenter, |
| chaudhuri1997overview, dewitt1992parallel} directly address this problem as |
| well. As the interactive data exploration space becomes more mature, |
| it is apparent that real-time ingestion and exploration of data will |
| unlock business-critical decisions for front office and back office |
| analysts. |
| |
| Metamarkets realized early on that for high data volumes, existing |
| Relational Database Management Systems (RDBMS) and most NoSQL |
| architectures were not sufficient to address the performance and |
| use-case needs of the business intelligence space. Druid was built to |
| address a gap we believe exists in the current big-data ecosystem for |
| a real-time analytical data store. Druid is a distributed, columnar, |
| shared-nothing data store designed to reliably scale to petabytes of |
| data and thousands of cores. It is a highly available system designed |
| to run on commodity hardware with no downtime in the face of failures, |
| data imports or software updates. We have been building Druid over the |
| course of the last two years and it is the core engine of the |
| Metamarkets technology stack. |
| |
| In many ways, Druid shares similarities with other interactive query systems |
| \cite{melnik2010dremel}, main-memory databases \cite{farber2012sap}, and widely-known distributed data |
| stores such as BigTable \cite{chang2008bigtable}, Dynamo \cite{decandia2007dynamo}, and Cassandra \cite{lakshman2010cassandra}. Unlike |
| most traditional data stores, Druid operates mainly on read-only data |
| and has limited functionality for writes. The system is highly optimized |
| for large-scale transactional data aggregation and arbitrarily deep data exploration. Druid is highly configurable |
| and allows users to adjust levels of fault tolerance and |
| performance. |
| |
| Druid builds on the ideas of other distributed data stores, real-time |
| computation engines, and search engine indexing algorithms. In this |
| paper, we make the following contributions to academia: |
| \begin{itemize} |
| \item We outline Druid’s real-time ingestion and query capabilities |
| and explain how we can explore events within milliseconds of their |
| creation |
| \item We describe how the architecture allows for fast and flexible |
| queries and how inverted indices can be applied to quickly filter |
| data |
| \item We present experiments benchmarking Druid’s performance |
| \end{itemize} |
| The format of the paper is as follows: Section \ref{sec:data-model} describes the Druid |
| data model. Section \ref{sec:cluster} presents an overview of the components of a |
| Druid cluster. Section \ref{sec:query-api} outlines the query API. Section \ref{sec:storage} describes |
| data storage format in greater detail. Section \ref{sec:robustness} discusses Druid |
| robustness and failure responsiveness. Section \ref{sec:benchmarks} provides our |
| performance benchmarks. Section \ref{sec:related} lists the related work and Section \ref{sec:conclusions} presents |
| our conclusions. |
| |
| \section{Data Model} |
| \label{sec:data-model} |
| The fundamental storage unit in Druid is the segment. Distribution and |
| replication in Druid are always done at a segment level. Segments |
| encapsulate a partition of a larger transactional data set. To better |
| understand how a typical row/column collection of data translates into |
| a Druid segment, consider the data set shown in Table~\ref{tab:sample_data}. |
| |
| \begin{table*} |
| \centering |
| \caption{Sample Druid data} |
| \label{tab:sample_data} |
| \texttt{ |
| \begin{tabular}{ l l l l l l l l l } |
| \hline |
| \textbf{Timestamp} & \textbf{Publisher} & \textbf{Advertiser} & \textbf{Gender} & \textbf{Country} & \textbf{Impressions} & \textbf{Clicks} & \textbf{Revenue} \\ |
| 2011-01-01T01:00:00Z & bieberfever.com & google.com & Male & USA & 1800 & 25 & 15.70 \\ |
| 2011-01-01T01:00:00Z & bieberfever.com & google.com & Male & USA & 2912 & 42 & 29.18 \\ |
| 2011-01-01T02:00:00Z & ultratrimfast.com & google.com & Male & USA & 1953 & 17 & 17.31 \\ |
| 2011-01-01T02:00:00Z & ultratrimfast.com & google.com & Male & USA & 3194 & 170 & 34.01 \\ |
| \end{tabular} |
| } |
| \end{table*} |
| |
| A segment is composed of multiple binary files, each representing a |
| column of a data set. The data set in Table~\ref{tab:sample_data} consists of 8 distinct |
| columns, one of which is the timestamp column. Druid always requires a |
| timestamp column because it (currently) only operates with event-based |
| data. Segments always represent some time interval and each column |
| file contains the specific values for that column over the time |
| interval. Since segments always contain data for a time range, it is |
| logical that Druid partitions data into smaller chunks based on the |
| timestamp value. In other words, segments can be thought of as blocks |
| of data that represent a certain granularity of time. For example, if |
| we wanted to shard the data in Table~\ref{tab:sample_data} to an hourly granularity, the |
| partitioning algorithm would result in two segments, one representing |
| each hour of 2011-01-01. Similarly, if we sharded the data to a daily |
| granularity, we would create a single segment for 2011-01-01. |
| |
| Partitioning the data based on granularity buckets allows users to |
| fine tune the degree of parallelization they want in Druid. A data set |
| representing a year’s worth of data may be bucketed by day, and a data |
| set representing only a day’s worth of day may be partitioned by |
| hour. Sharding on a single dimension (time) may still result in segments that are too large to manage if the data volume is sufficiently high. |
| To create more operable partition chunks, Druid may |
| additionally shard data based on other factors such as dimension |
| cardinality. Each shard creates a segment and hence, segments are uniquely identified by a data source |
| id describing the data, the time interval of the data, a version |
| indicating when the segment was created, and a shard partition number. |
| |
| Data in Druid can be conceptually thought of as being either real-time |
| or historical. Real-time data refers to recently ingested data; |
| typically, in a production setting, this will be data for the current |
| hour. Historical data refers to any data that is older. Segments can |
| be similarly classified as being either real-time or historical. |
| |
| Historical segments are immutable and do not support insert, delete, |
| or update semantics. By maintaining immutable blocks of data within |
| the system, we can maintain a consistent snapshot of historical |
| data and provide read consistency without having to worry about |
| concurrent updates and deletes. If updates to a historical segment are |
| required, we build a new segment for the same data source and time |
| interval with the updated data. This new segment will have an updated version identifier. |
| |
| Multiple segments for the same data source and time range |
| may exist in the system at any time. To provide read consistency, |
| Druid read operations always access data in a particular time range |
| from the segment with the latest version identifier for that time |
| range. Historical segments may be stored on local disk or in a |
| key-value “deep” store such as S3 \cite{decandia2007dynamo} or HDFS \cite{shvachko2010hadoop}. All historical |
| segments have associated metadata describing properties of the segment |
| such as size in bytes, compression format, and location in deep |
| storage. |
| |
| Real-time segments are mutable and generally represent a much shorter |
| duration of time than historical segments. Real-time segments contain |
| recent data and are incrementally populated as new events are |
| ingested. On a periodic basis, real-time segments are converted into |
| historical segments. Additional details about this conversion process |
| are given in Section~\ref{sec:realtime}. |
| |
| \section{Cluster} |
| \label{sec:cluster} |
| A Druid cluster consists of different types of nodes, each performing |
| a specific function. The composition of a Druid cluster is shown in |
| Figure~\ref{fig:druid_cluster}. |
| |
| \begin{figure*} |
| \centering |
| \includegraphics[width = 6in]{druid_cluster} |
| \caption{An overview of a Druid cluster.} |
| \label{fig:druid_cluster} |
| \end{figure*} |
| |
| Recall that data in Druid is classified as either real-time or |
| historical. The Druid cluster is architected to reflect this |
| conceptual separation of data. Real-time nodes are responsible for |
| ingesting, storing, and responding to queries for the most recent |
| events. Similarly, historical compute nodes are responsible for |
| loading and responding to queries for historical events. |
| |
| Data in Druid is stored on storage nodes. Storage nodes can be either |
| compute or real-time nodes. Queries to access this data will |
| typically first hit a layer of broker nodes. Broker nodes are |
| responsible for finding and routing queries down to the storage nodes |
| that host the pertinent data. The storage nodes compute their portion |
| of the query response in parallel and return their results to the |
| brokers. Broker nodes, compute nodes, and realtime nodes can all be |
| classified as queryable nodes. |
| |
| Druid also has a set of coordination nodes to manage load assignment, |
| distribution, and replication. Coordination nodes are not queryable |
| and instead focus on maintaining cluster stability. Coordination nodes |
| have an external dependency on a MySQL database. |
| |
| Druid relies on an Apache Zookeeper \cite{hunt2010zookeeper} cluster |
| for coordination. This dependency is required because there is no |
| direct coordination-related communication between Druid nodes. The |
| following sections will discuss each Druid component in greater |
| detail. |
| |
| \subsection{Apache Zookeeper} |
| Zookeeper is a service for coordinating processes of distributed |
| applications. Zookeeper provides connecting applications an |
| abstraction of a hierarchy of data nodes known as znodes. Each znode |
| is part of a hierarchical namespace, similar to file |
| systems. Zookeeper has the concept of ephemeral and permanent |
| znodes. Permanent nodes must be created and destroyed explicitly by a |
| connecting application. Ephemeral znodes can be created by connecting |
| applications and deleted either explicitly or if the session that |
| created the znode is terminated (such as in the event of service |
| failure). |
| |
| \subsection{Historical Compute Nodes} |
| Historical compute nodes are the main workers of a Druid cluster and |
| are self-contained and self-sufficient. Compute nodes load historical |
| segments from permanent/deep storage and expose them for |
| querying. There is no single point of contention between the nodes and |
| nodes have no knowledge of one another. Compute nodes are |
| operationally simple; they only know how to perform the tasks they are |
| assigned. To help other services discover compute nodes and the data |
| they hold, every compute node maintains a constant Zookeeper |
| connection. Compute nodes announce their online state and the segments |
| they serve by creating ephemeral nodes under specifically configured |
| Zookeeper paths. Instructions for a given compute node to load new |
| segments or drop existing segments are sent by creating ephemeral |
| znodes under a special “load queue” path associated with the compute |
| node. |
| |
| To expose a segment for querying, a compute node must first possess a |
| local copy of the segment. Before a compute node downloads a segment |
| from deep storage, it first checks a local disk directory (cache) to |
| see if the segment already exists in local storage. If no cache |
| information about the segment is present, the compute node will |
| download metadata about the segment from Zookeeper. This metadata |
| includes information about where the segment is located in deep |
| storage and about how to decompress and process the segment. Once a |
| compute node completes processing a segment, the node announces (in |
| Zookeeper) that it is serving the segment. At this point, the segment |
| is queryable. |
| |
| \subsubsection{Tiers} |
| \label{sec:tiers} |
| Compute nodes can be grouped in different tiers, where all nodes in a |
| given tier are identically configured. Different performance and |
| fault-tolerance parameters can be set for each tier. The purpose of |
| tiered nodes is to enable higher or lower priority segments to be |
| distributed according to their importance. For example, it is possible |
| to spin up a “hot” tier of compute nodes that have a high number of |
| cores and a large RAM capacity. The “hot” cluster can be configured to |
| download more frequently accessed segments. A parallel “cold” cluster |
| can also be created with much less powerful backing hardware. The |
| “cold” cluster would only contain less frequently accessed segments. |
| |
| \subsection{Real-time Nodes} |
| \label{sec:realtime} |
| Real-time nodes encapsulate the functionality to ingest and query |
| real-time data streams. Data indexed via these nodes is immediately |
| available for querying. Real-time nodes are a consumer of data and |
| require a corresponding producer to provide the data |
| stream. Typically, for data durability purposes, a message bus such as |
| Kafka \cite{kreps2011kafka} sits between the producer and the real-time node as shown |
| in Figure~\ref{fig:data-ingestion}. |
| |
| \begin{figure} |
| \centering |
| \includegraphics[width = 3in]{druid_message_bus} |
| \caption{Real-time data ingestion.} |
| \label{fig:data-ingestion} |
| \end{figure} |
| |
| The purpose of the message bus in Figure~\ref{fig:data-ingestion} is to act as a buffer for |
| incoming events. The message bus can maintain offsets indicating the |
| position in an event stream that a real-time node has read up to and |
| real-time nodes can update these offsets periodically. The message bus also acts as a backup storage for recent events. |
| Real-time nodes ingest data by reading events from the message bus. The time from event creation to message bus storage to |
| event consumption is on the order of hundreds of milliseconds. |
| |
| Real-time nodes maintain an in-memory index for all incoming |
| events. These indexes are incrementally populated as new events appear on the message bus. The indexes are also directly queryable. |
| Real-time nodes persist their indexes to disk either periodically or after some maximum row limit is |
| reached. After each persist, a real-time node updates the message bus |
| with the offset of the last event of the most recently persisted |
| index. Each persisted index is immutable. If a real-time node fails and recovers, it can simply reload |
| any indexes that were persisted to disk and continue reading the |
| message bus from the point the last offset was committed. Periodically committing offsets reduces the number of messages a real-time |
| node has to rescan after a failure scenario. |
| |
| Real-time nodes maintain a consolidated view of the currently updating |
| index and of all indexes persisted to disk. This unified view allows |
| all indexes on a node to be queried. On a periodic basis, the nodes will |
| schedule a background task that searches for all persisted indexes of |
| a data source. The task merges these indexes together and builds a |
| historical segment. The nodes will upload the segment to deep storage |
| and provide a signal for the historical compute nodes to begin serving |
| the segment. The ingest, persist, merge, and handoff steps are fluid; |
| there is no data loss as a real-time node converts a real-time segment |
| to a historical one. Figure~\ref{fig:data-durability} illustrates the process. |
| |
| \begin{figure} |
| \centering |
| \includegraphics[width = 3in]{druid_realtime_flow} |
| \caption{Real-time data durability} |
| \label{fig:data-durability} |
| \end{figure} |
| |
| Similar to compute nodes, real-time nodes announce segments in |
| Zookeeper. Unlike historical segments, real-time segments may |
| represent a period of time that extends into the future. For example, |
| a real-time node may announce it is serving a segment that contains |
| data for the current hour. Before the end of the hour, the real-time |
| node continues to collect data for the hour. Every 10 minutes (the |
| persist period is configurable), the node will flush and persist its |
| in-memory index to disk. At the end of the current hour, the real-time |
| node prepares to serve data for the next hour by creating a new index |
| and announcing a new segment for the next hour. The node does not |
| immediately merge and build a historical segment for the previous hour |
| until after some window period has passed. Having a window period |
| allows for straggling data points to come in and minimizes the risk of |
| data loss. At the end of the window period, the real-time node will |
| merge all persisted indexes, build a historical segment for the |
| previous hour, and hand the segment off to historical nodes to |
| serve. Once the segment is queryable on the historical nodes, the |
| real-time node flushes all information about the segment and |
| unannounces it is serving the segment. |
| |
| Real-time nodes are highly scalable. If the data volume and ingestion |
| rates for a given data source exceed the maximum capabilities of a |
| single node, additional nodes can be added. Multiple nodes can |
| consume events from the same stream, and every individual node only |
| holds a portion of the total number of events. This creates natural |
| partitions across nodes. Each node announces the real-time segment it |
| is serving and each real-time segment has a partition number. Data |
| from individual nodes will be merged at the Broker level. To our |
| knowledge, the largest production level real-time Druid cluster is |
| consuming approximately 2 TB of raw data per hour. |
| |
| \subsection{Broker Nodes} |
| Broker nodes act as query routers to other queryable nodes such as |
| compute and real-time nodes. Broker nodes understand the metadata |
| published in Zookeeper about what segments exist and on what nodes the |
| segments are stored. Broker nodes route incoming queries such that the queries hit |
| the right storage nodes. Broker nodes also merge partial results from |
| storage nodes before returning a final consolidated result to the |
| caller. Additionally, brokers provide an extra level of data |
| durability as they maintain a cache of recent results. In the event |
| that multiple storage nodes fail and all copies of a segment are |
| somehow lost, it is still possible that segment results can still be |
| returned if that information exists in the cache. |
| |
| \subsubsection{Timeline} |
| To determine the correct nodes to forward queries to, Broker nodes |
| first build a view of the world from information in Zookeeper. Recall |
| that Druid uses Zookeeper to maintain information about all compute |
| and real-time nodes in a cluster and the segments those nodes are |
| serving. For every data source in Zookeeper, the Broker node builds a |
| timeline of segments for the data source and the nodes that serve them. A timeline |
| consists of segments and represents which segments contain data for |
| what ranges of time. Druid may have multiple segments where the data |
| source and interval are the same but versions differ. The timeline |
| view will always surface segments with the most recent version |
| identifier for a time range. If two segments intervals overlap, the segment with the more recent |
| version always has precedence. When queries are received for a specific |
| data source and interval, the Broker node performs a lookup on the |
| timeline associated with the query data source for the query interval |
| and retrieves the segments that contain data for the query. The broker |
| node maps these segments to the storage nodes that serve them and |
| forwards the query down to the respective nodes. |
| |
| |
| \subsubsection{Caching} |
| \label{sec:caching} |
| Broker nodes employ a distributed cache with a LRU \cite{o1993lru, |
| kim2001lrfu} cache invalidation strategy. The broker cache stores |
| per segment results. The cache can be local to each broker node or |
| shared across multiple nodes using an external distributed cache |
| such as memcached \cite{fitzpatrick2004distributed}. Recall that each time a broker node receives a |
| query, it first maps the query to a set of segments. A subset of |
| these segment results may already exist in the cache and the results |
| can be directly pulled from the cache. For any segment results that |
| do not exist in the cache, the broker node will forward the query |
| to the compute nodes. Once the compute nodes return their results, |
| the broker will store those results in the cache. Real-time segments |
| are never cached and hence requests for real-time data will always |
| be forwarded to real-time nodes. Real-time data is perpetually |
| changing and caching the results would be unreliable. |
| |
| |
| \subsection{Coordination (Master) Nodes} |
| The Druid coordination or master nodes are primarily in charge of |
| segment management and distribution. The Druid master is responsible |
| for loading new segments, dropping outdated segments, managing segment |
| replication, and balancing segment load. Druid uses a multi-version |
| concurrency control swapping protocol for managing segments in order |
| to maintain stable views. |
| |
| The Druid master runs periodically to determine the current state of |
| the cluster. It makes decisions by comparing the expected state of the |
| cluster with the actual state of the cluster at the time of the |
| run. As with all Druid nodes, the Druid master maintains a connection |
| to Zookeeper for current cluster information. The master also |
| maintains a connection to a MySQL database that contains additional |
| operational parameters and configurations. One of the key pieces of |
| information located in the MySQL database is a segment table that |
| contains a list of historical segments that should be served. This |
| table can be updated by any service that creates historical |
| segments. The MySQL database also contains a rule table that governs |
| how segments are created, destroyed, and replicated in the cluster. |
| |
| The master does not directly communicate with a compute node when |
| assigning it work; instead the master creates an ephemeral znode in |
| Zookeeper containing information about what the compute node should |
| do. The compute node maintains a similar connection to Zookeeper to |
| monitor for new work. |
| |
| \subsubsection{Rules} |
| Historical segments are loaded and dropped from the cluster based on a |
| set of rules. Rules indicate how segments should be assigned to |
| different compute node tiers and how many replicates of a segment |
| should exist in each tier. Rules may also indicate when segments |
| should be dropped entirely from the cluster. The master loads a set of |
| rules from a rule table in the MySQL database. Rules may be specific |
| to a certain data source and/or a default set of rules can be |
| configured. The master will cycle through all available segments and |
| match each segment with the first rule that applies to it. |
| |
| \subsubsection{Load Balancing} |
| In a typical production environment, queries often hit dozens or even |
| hundreds of data segments. Since each compute node has limited |
| resources, historical segments must be distributed among the cluster |
| to ensure that the cluster load is not too imbalanced. Determining |
| optimal load distribution requires some knowledge about query patterns |
| and speeds. Typically, queries cover recent data spanning contiguous |
| time intervals for a single data source. On average, queries that |
| access smaller segments are faster. |
| |
| These query patterns suggest replicating recent historical segments at |
| a higher rate, spreading out large segments that are close in time to |
| different compute nodes, and co-locating segments from different data |
| sources. To optimally distribute and balance segments among the |
| cluster, we developed a cost-based optimization procedure that takes |
| into account the segment data source, recency, and size. The exact |
| details of the algorithm are beyond the scope of this paper. |
| |
| \section{Query API} |
| \label{sec:query-api} |
| Queries to Druid are made in the form of POST requests. All queryable |
| Druid nodes share the same query API. The body of the POST request is |
| a JSON object containing key-value pairs specifying various query |
| parameters. A typical query will contain the data source name, the |
| granularity of the result data, the time range of interest, and the |
| type of request. A sample time series query is shown below: |
| \begin{verbatim} |
| { |
| "queryType" : "timeseries", |
| "dataSource" : "sample_data", |
| "intervals" : "2013-01-01/2013-01-02", |
| "filter" : { |
| "type" : "selector", |
| "dimension" : "poets", |
| "value" : "Ke$ha" |
| }, |
| "granularity" : "day", |
| "aggregations" : [ |
| { |
| "type" : "count", |
| "fieldName" : "row", |
| "name" : "row" |
| } |
| ] |
| } |
| \end{verbatim} |
| |
| Certain query types will also support a filter set. A filter set is an |
| arbitrary Boolean expression of dimension name and value |
| pairs. Support for complex nested filter sets enables flexibility and |
| provides the ability to deeply explore data. |
| |
| The exact query syntax depends on the query type and the information requested. |
| It is beyond the scope of this paper to describe the Query API in full detail. |
| We are also in the process of building out SQL support for Druid. |
| |
| \section{Storage} |
| \label{sec:storage} |
| Druid is a column-oriented data store. When considering aggregates |
| over a large number of events, the advantages storing data as columns |
| rather than rows are well documented \cite{cattell2011scalable}. Column storage allows for |
| more efficient CPU usage as only what is needed is actually loaded and |
| scanned. In a row oriented data store, all columns associated with a |
| row must be scanned as part of an aggregation. The additional scan |
| time can introduce performance degradations as high as 250\% \cite{bear2012vertica}. |
| |
| \subsection{Column Types} |
| Druid has multiple column types to represent the various column value |
| formats. Depending on the column type, different compression methods |
| are used to reduce the cost of storing a column in memory and on |
| disk. In the example given in Table~\ref{tab:sample_data}, the |
| publisher, advertiser, gender, and country columns only contain |
| strings. String columns can be dictionary encoded. Dictionary encoding |
| is a common method to compress data and has been used in other data |
| stores such as Powerdrill \cite{hall2012processing}. In the example in |
| Table~\ref{tab:sample_data}, we can map each publisher to an unique |
| integer identifier. |
| \begin{verbatim} |
| bieberfever.com -> 0 |
| ultratrimfast.com -> 1 |
| \end{verbatim} |
| This mapping allows us to represent the publisher column as an integer |
| array where the array indices correspond to the rows of the original |
| data set. For the publisher column, we can represent the unique |
| publishers as follows: |
| \begin{verbatim} |
| [0, 0, 1, 1] |
| \end{verbatim} |
| |
| The resulting integer array lends itself very well to |
| compression. Generic compression algorithms on top of encodings are |
| very common in column-stores. We opted to use the LZF \cite{liblzf2013} compression |
| algorithm. |
| |
| We can leverage similar compression algorithms for numeric |
| columns. For example, the clicks and revenue columns in |
| Table~\ref{tab:sample_data} can also be expressed as individual |
| arrays. |
| \begin{verbatim} |
| Clicks -> [25, 42, 17, 170] |
| Revenue -> [15.70, 29.18, 17.31, 34.01] |
| \end{verbatim} |
| In this case we compress the raw values as opposed to their dictionary |
| representations. |
| |
| \subsection{Filters} |
| To support arbitrary filter sets, Druid creates additional lookup |
| indices for string columns. These lookup indices are compressed and |
| Druid operates over the indices in their compressed form. Filters can |
| be expressed as Boolean equations of multiple lookup indices. Boolean |
| operations of indices in their compressed form is both performance and |
| space efficient. |
| |
| Let us consider the publisher column in |
| Table~\ref{tab:sample_data}. For each unique publisher in |
| Table~\ref{tab:sample_data}, we can form some representation |
| indicating which table rows a particular publisher is seen. We can |
| store this information in a binary array where the array indices |
| represent our rows. If a particular publisher is seen in a certain |
| row, that array index is marked as \texttt{1}. For example: |
| \begin{verbatim} |
| bieberfever.com -> rows [0, 1] -> [1][1][0][0] |
| ultratrimfast.com -> rows [2, 3] -> [0][0][1][1] |
| \end{verbatim} |
| |
| \texttt{\href{http://bieberfever.com}{bieberfever.com}} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values |
| to row indices forms an inverted index \cite{tomasic1993performance}. If we want to know which |
| rows contain {\ttfamily bieberfever.com} or {\ttfamily ultratrimfast.com}, we can \texttt{OR} together |
| the \texttt{bieberfever.com} and \texttt{ultratrimfast.com} arrays. |
| \begin{verbatim} |
| [0][1][0][1] OR [1][0][1][0] = [1][1][1][1] |
| \end{verbatim} |
| |
| \begin{figure} |
| \centering |
| \includegraphics[width = 3in]{concise_plot} |
| \caption{Integer array size versus CONCISE set size.} |
| \label{fig:concise_plot} |
| \end{figure} |
| |
| This approach of performing Boolean operations on large bitmap sets is |
| commonly used in search engines. Bitmap compression algorithms are a |
| well-defined area of research and often utilize run-length |
| encoding. Well known algorithms include Byte-aligned Bitmap Code \cite{antoshenkov1995byte}, |
| Word-Aligned Hybrid (WAH) code \cite{wu2006optimizing}, and Partitioned Word-Aligned |
| Hybrid (PWAH) compression \cite{van2011memory}. Druid opted to use the CONCISE |
| algorithm \cite{colantonio2010concise} as it can outperform WAH by reducing the size of the |
| compressed bitmaps by up to 50\%. Figure~\ref{fig:concise_plot} illustrates the number of |
| bytes using CONCISE compression versus using an integer array. The |
| results were generated on a cc2.8xlarge system with a single thread, |
| 2G heap, 512m young gen, and a forced GC between each run. The data |
| set is a single day’s worth of data collected from the Twitter garden |
| hose \cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12 |
| dimensions of varying cardinality. As an additional comparison, we |
| also resorted the data set rows to maximize compression. |
| |
| In the unsorted case, the total CONCISE compressed size was 53,451,144 |
| bytes and the total integer array size was 127,248,520 bytes. Overall, |
| CONCISE compressed sets are about 42\% smaller than integer arrays. |
| In the sorted case, the total CONCISE compressed size was 43,832,884 |
| bytes and the total integer array size was 127,248,520 bytes. What is |
| interesting to note is that after sorting, global compression only |
| increased minimally. The total CONCISE set size to total integer array |
| size is 34\%. It is also interesting to note that as the cardinality |
| of a dimension approaches the total number of rows in a data set, |
| integer arrays actually require less space than CONCISE sets. |
| |
| \subsection{Storage Engine} |
| Druid’s persistence components allows for different storage engines to |
| be plugged in, similar to Dynamo \cite{decandia2007dynamo}. These storage engines may store |
| data in in-memory structures such as the JVM heap or in memory mapped |
| structures. The ability to swap storage engines allows for Druid to be |
| configured depending on a particular application’s specifications. An |
| in-memory storage engine may be operationally more expensive than a |
| memory-mapped storage engine but could be a better alternative if |
| performance is critical. At Metamarkets, we commonly use a |
| memory-mapped storage engine. |
| |
| \section{Robustness and Fault-Tolerance} |
| \label{sec:robustness} |
| To achieve high system availability and data durability, Druid employs |
| several fault recovery techniques. Druid has no single point of |
| failure. |
| |
| \subsection{Replication} |
| Druid replicates historical segments on multiple hosts. The number of |
| replicates in each tier of the historical compute cluster is fully |
| configurable. Setups that require high levels of fault tolerance can |
| be configured to have a high number of replicates. Replicates are |
| assigned to compute nodes by coordination nodes using the same load |
| distribution algorithm discussed in Section~\ref{sec:caching}. Conceptually, |
| broker nodes do not distinguish historical segments from their |
| replicates. Broker nodes forward queries to the first node it finds |
| that contains data for the query. |
| |
| Real-time segments follow a different replication model as real-time |
| segments are mutable. Recall that real-time nodes act as consumers of |
| a data stream. Multiple real-time nodes can read from the same message |
| bus if each maintains a unique offset, hence creating multiple copies |
| of a real-time segment. If a real-time node fails and recovers, it can |
| reload any indexes that were persisted to disk and read from the |
| message bus from the point it last committed an offset. |
| |
| \subsection{Local Segment Cache} |
| Recall that each Druid compute node maintains a local cache of |
| historical segments it recently served. A compute node also has a |
| lookup table for segments it has in its cache and stores this lookup |
| table on disk. When a compute node is assigned a new segment to load, |
| the compute node will first check its local segment cache directory to |
| see if the segment had been previously downloaded. If a cache entry |
| exists, the compute node will directly read the segment binary files |
| and load the segment. |
| |
| The segment cache is also leveraged when a compute node is initially |
| started. During startup, a compute node will first read its lookup |
| table to determine what segments it has cached. All cached segments |
| are immediately loaded and served. This feature introduces minimal |
| overhead and allows a compute node to readily serve data as soon as it |
| comes online. By making data quickly available on startup and |
| minimizing initial startup time, compute nodes that become |
| inexplicably disconnected from the cluster can reconnect themselves |
| seamlessly. This also means that software updates can be pushed to |
| compute nodes without disruption to cluster operations. In practice, a |
| software update to a compute node can be completed before coordination |
| nodes even notice that the compute node has disconnected. At |
| Metamarkets, we update Druid through rolling restarts. Compute nodes |
| are updated one at a time and we experience no downtime or data loss |
| through the update process. |
| |
| \subsection{Failure Detection} |
| If a compute node completely fails and becomes unavailable, the |
| ephemeral Zookeeper znodes it created are deleted. The master node |
| will notice that certain segments are insufficiently replicated or |
| missing altogether. Additional replicates will be created and |
| redistributed throughout the cluster. |
| |
| We are moving towards building out infrastructure to support |
| programmatic creation of real-time nodes. In the near future, the |
| master node will also notice if real-time segments are insufficiently |
| replicated and automatically create additional real-time nodes as |
| redundant backups. |
| |
| \subsection{Adding and Removing Nodes} |
| Starting and removing Druid nodes is a relatively simple process; all |
| that is required is to start and stop Java processes. There is minimal |
| operational overhead with adding nodes in batches. Scaling down the |
| cluster is usually done one node at a time with some time lapse |
| between shutdowns. This allows the master to have ample time to |
| redistribute load and create additional replicates. Shutting down |
| nodes in batches is generally not supported as it may destroy all |
| copies of a segment, which would lead to data loss. |
| |
| \section{Performance Benchmarks} |
| \label{sec:benchmarks} |
| To benchmark Druid performance, we created a large test cluster with |
| 6TB of uncompressed data, representing tens of billions of fact |
| rows. The data set contained more than a dozen dimensions, with |
| cardinalities ranging from the double digits to tens of millions. We computed |
| four metrics for each row (counts, sums, and averages). The data was |
| sharded first on timestamp then on dimension values, creating |
| thousands of shards roughly 8 million fact rows apiece. |
| |
| The cluster used in the benchmark consisted of 100 historical compute |
| nodes, each with 16 cores, 60GB of RAM, 10 GigE Ethernet, and 1TB of |
| disk space. Collectively, the cluster comprised of 1600 cores, 6TB or |
| RAM, fast Ethernet and more than enough disk space. |
| |
| SQL statements are included in Table~\ref{tab:sql_queries} to describe the |
| purpose of each of the queries. Please note: |
| \begin{itemize} |
| \item The timestamp range of the queries encompassed all data. |
| \item Each machine was a 16-core machine with 60GB RAM and 1TB of local |
| disk. The machine was configured to only use 15 threads for |
| processing queries. |
| \item A memory-mapped storage engine was used (the machine was configured to memory map the data |
| instead of loading it into the Java heap.) |
| \end{itemize} |
| |
| \begin{figure} |
| \centering |
| \includegraphics[width = 3in]{cluster_scan_rate} |
| \caption{Druid cluster scan rate with lines indicating linear scaling |
| from 25 nodes.} |
| \label{fig:cluster_scan_rate} |
| \end{figure} |
| |
| \begin{figure} |
| \centering |
| \includegraphics[width = 3in]{core_scan_rate} |
| \caption{Druid core scan rate.} |
| \label{fig:core_scan_rate} |
| \end{figure} |
| |
| \begin{table*} |
| \centering |
| \caption{Druid Queries} |
| \label{tab:sql_queries} |
| \begin{verbatim} |
| 1 SELECT count(*) FROM _table_ WHERE timestamp >= ? AND timestamp < ? |
| |
| 2 SELECT count(*), sum(metric1) FROM _table_ WHERE timestamp >= ? AND timestamp < ? |
| |
| 3 SELECT count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM _table_ |
| WHERE timestamp >= ? AND timestamp < ? |
| |
| 4 SELECT high_card_dimension, count(*) AS cnt FROM _table_ |
| WHERE timestamp >= ? AND timestamp < ? |
| GROUP BY high_card_dimension ORDER BY cnt limit 100 |
| |
| 5 SELECT high_card_dimension, count(*) AS cnt, sum(metric1) FROM _table_ |
| WHERE timestamp >= ? AND timestamp < ? |
| GROUP BY high_card_dimension ORDER BY cnt limit 100 |
| |
| 6 SELECT high_card_dimension, count(*) AS cnt, sum(metric1), sum(metric2), sum(metric3), sum(metric4) |
| FROM _table_ WHERE timestamp >= ? AND timestamp < ? |
| GROUP BY high_card_dimension ORDER BY cnt limit 100 |
| \end{verbatim} |
| \end{table*} |
| |
| %\begin{enumerate} |
| %\item \texttt{Select high\_card\_dimension, count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) AS cnt from \_table\_ where timestamp $\geq$ ? and timestamp $<$ ? group by high\_card\_dimension order by cnt limit 100;} |
| %\end{enumerate} |
| |
| Figure~\ref{fig:cluster_scan_rate} shows the cluster scan rate and |
| Figure~\ref{fig:core_scan_rate} shows the core scan rate. In |
| Figure~\ref{fig:cluster_scan_rate} we also include projected linear |
| scaling based on the results of the 25 core cluster. In particular, |
| we observe diminishing marginal returns to performance in the size of |
| the cluster. Under linear scaling, SQL query 1 would have achieved a |
| speed of 37 billion rows per second on our 75 node cluster. In fact, |
| the speed was 26 billion rows per second. The speed up of a parallel |
| computing system is often limited by the time needed for the |
| sequential operations of the system, in accordance with Amdahl's law |
| \cite{amdahl1967validity}. |
| |
| The first query listed in Table~\ref{tab:sql_queries} is a simple |
| count, achieving scan rates of 33M rows/second/core. We believe |
| the 75 node cluster was actually overprovisioned for the test |
| dataset, explaining the modest improvement over the 50 node cluster. |
| Druid's concurrency model is based on shards: one thread will scan one |
| shard. If the number of segments on a compute node modulo the number |
| of cores is small (e.g. 17 segments and 15 cores), then many of the |
| cores will be idle during the last round of the computation. |
| |
| When we include more aggregations we see performance degrade. This is |
| because of the column-oriented storage format Druid employs. For the |
| \texttt{count(*)} queries, Druid only has to check the timestamp column to satisfy |
| the ``where'' clause. As we add metrics, it has to also load those metric |
| values and scan over them, increasing the amount of memory scanned. |
| |
| \section{Related Work} |
| \label{sec:related} |
| Cattell \cite{cattell2011scalable} maintains a great summary about existing Scalable SQL and |
| NoSQL data stores. In the landscape of distributed data stores, Druid |
| feature-wise sits somewhere between Google’s Dremel \cite{melnik2010dremel} and Powerdrill |
| \cite{hall2012processing}. Druid has most of the features implemented in Dremel (Dremel |
| handles arbitrary nested data structures while Druid only allows for a |
| single level of array-based nesting) and many of the interesting |
| compression algorithms mentioned in Powerdrill. |
| |
| Although Druid builds on many of the same principles as other |
| distributed column-oriented data stores \cite{fink2012distributed}, most existing data |
| stores are designed to be key-value stores \cite{lerner2010redis}, or |
| document/extensible record stores \cite{stonebraker2005c}. Such data stores are great |
| solutions for traditional data warehouse needs and general |
| back-office/reporting usage. Typically, analysts will query these data |
| stores and build reports from the results. In-memory databases such as |
| SAP’s HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb} are examples of other data stores that |
| are highly suited for traditional data warehousing needs. Druid is a |
| front-office system designed such that user-facing dashboards can be |
| built on top of it. Similar to \cite{paraccel2013}, Druid has analytical features |
| built in. The main features Druid offers over traditional data |
| warehousing solutions are real-time data ingestion, interactive |
| queries and interactive query latencies. In terms of real-time |
| ingestion and processing of data, Trident/Storm \cite{marz2013storm} and Streaming |
| Spark \cite{zaharia2012discretized} are other popular real-time computation systems, although |
| they lack the data storage capabilities of Druid. Spark/Shark \cite{engle2012shark} are |
| also doing similar work in the area of fast data analysis on large |
| scale data sets. Cloudera Impala \cite{cloudera2013} is another system focused on |
| optimizing querying performance, but more so in Hadoop environments. |
| |
| Druid leverages a unique combination of algorithms in its |
| architecture. Although we believe no other data store has the same set |
| of functionality as Druid, some of Druid’s features such as using |
| inverted indices to perform faster filters also exist in other data |
| stores \cite{macnicol2004sybase}. |
| |
| \section{Conclusions} |
| \label{sec:conclusions} |
| In this paper, we presented Druid, a distributed, column-oriented, |
| real-time analytical data store. Druid is a highly customizable |
| solution that is optimized for fast query latencies. Druid ingests |
| data in real-time and is fault-tolerant. We discussed the performance |
| of Druid on billion row data sets. We summarized key Druid architecture |
| aspects such as the storage format, query language and general |
| execution. In the future, we plan to cover more in depth the different |
| algorithms we’ve developed for Druid and how other systems may plug |
| into Druid to achieve powerful use cases. |
| |
| \section{Acknowledgements} |
| \label{sec:acknowledgements} |
| We want to thank Steve Harris and Jaypal Sethi for their feedback on improving this paper. |
| We also want to give recognition to Adam Smith; without him the first Metamarkets hackathon would not have |
| been organized and this paper would not have been created. Another special recognition to Katherine Chu for |
| helping to create all the images in this paper. |
| |
| Druid could not have been built without the help of many great |
| engineers at Metamarkets and in the community. We want to thank Danny Yuan, Jae Hyeon Bae, Paul Baclace, Dave |
| Nielsen, and Dhruv Parthasarathy for their |
| contributions to Druid. |
| |
| % The following two commands are all you need in the |
| % initial runs of your .tex file to |
| % produce the bibliography for the citations in your paper. |
| \bibliographystyle{abbrv} |
| \bibliography{druid} % druid.bib is the name of the Bibliography in this case |
| % You must have a proper ".bib" file |
| % and remember to run: |
| % latex bibtex latex latex |
| % to resolve all references |
| |
| %Generated by bibtex from your ~.bib file. Run latex, |
| %then bibtex, then latex twice (to resolve references). |
| |
| %APPENDIX is optional. |
| % ****************** APPENDIX ************************************** |
| % Example of an appendix; typically would start on a new page |
| %pagebreak |
| |
| \end{document} |