In this post, we will cover the creation of web applications with realtime visualizations using Druid, Ruby/Python and D3.js. Complete code in Ruby and Python for this example is available at https://github.com/rjurney/druid-application-development.
For more information on the Ruby and Python Druid clients, see here and here. For more information on starting a Druid realtime node, see here.
Our Python Flask application is simple enough. One route serves our HTML/CSS/Javascript, and another serves JSON to our chart. The fetch_data method runs our Druid query via the pyDruid package.
from flask import Flask, render_template
import json
import re
from pydruid.client import *
# Setup Flask
app = Flask(__name__)
# Druid Config
endpoint = 'druid/v2/?pretty'
demo_bard_url = 'http://localhost:8083'
dataSource = 'webstream'
# Boot a Druid
query = pyDruid(demo_bard_url, endpoint)
# Display our HTML Template
@app.route("/time_series")
def time_series():
return render_template('index.html')
# Fetch our data from Druid
def fetch_data(start_iso_date, end_iso_date):
intervals = [start_iso_date + "/" + end_iso_date]
counts = query.timeseries(dataSource = dataSource,
granularity = "second",
intervals = intervals,
aggregations = {"count" : doubleSum("rows")}
)
json_data = json.dumps(counts)
return json_data
# Deliver data in JSON to our chart
@app.route("/time_series_data/<start_iso_date>/<end_iso_date>")
def time_series_data(start_iso_date, end_iso_date):
return fetch_data(start_iso_date, end_iso_date)
if __name__ == "__main__":
app.run(debug=True)
Our Ruby application using Sinatra and ruby-druid is similar. First we setup some Sinatra configuration variables, and then repeat the work above:
# index.rb
require 'sinatra'
require 'druid'
require 'json'
set :public_folder, File.dirname(__FILE__) + '/static'
set :views, 'templates'
client = Druid::Client.new('', {:static_setup => { 'realtime/webstream' => 'http://localhost:8083/druid/v2/' }})
def fetch_data(client, start_iso_date, end_iso_date)
query = Druid::Query.new('realtime/webstream').time_series().double_sum(:rows).granularity(:second).interval(start_iso_date, end_iso_date)
result = client.send(query)
counts = result.map {|r| {'timestamp' => r.timestamp, 'result' => r.row}}
json = JSON.generate(counts)
end
get '/time_series' do
erb :index
end
get '/time_series_data/:start_iso_date/:end_iso_date' do |start_iso_date, end_iso_date|
fetch_data(client, start_iso_date, end_iso_date)
end
The meat of our appliation is in Javascript, using the d3.js library. The complete code is here and a working JSFiddle is here. Commented code highlights are below:
// Made possible only with help from Vadim Ogeivetsky
var data = [];
var maxDataPoints = 20; // Max number of points to keep in the graph
var nextData = data;
var dataToShow = [];
setInterval(function() {
data = nextData;
// Skip when nothing more to show
if (dataToShow.length == 0) return;
// Take on datum from the new data and add it to the data
// (pretend like the data is arriving one at a time)
data.push(dataToShow.shift());
// once we get too many things in data, remove some
// use nextData to train the scales but use the untrimmed data
// for rendering so that it looks smooth
nextData = data.length > maxDataPoints ? data.slice(data.length - maxDataPoints) : data;
// can not show area unless we gave min 2 points
if (data.length < 2) return;
// This is a key step that needs to be done because of the
// paculiarity of area / line charts
// (they have one element that represnts N data points - unlike a bar chart)
// reaply the old area function (with the old scale) to the new data
dPath.attr("d", area(data))
// Update the scale domains
x.domain(d3.extent(nextData, function(d) { return d.date; }));
y.domain([0, d3.max(nextData, function(d) { return d.close; })]);
// reaply the axis selection (now that the scales have been updated)
// yay for transition!
xAxisSel.transition().duration(900).call(xAxis);
yAxisSel.transition().duration(900).call(yAxis);
// reaply the updated area function to animate the area
dPath.transition().duration(900).attr("d", area(data))
}, 1000);
function convert(ds) {
return ds.map(function(d) {
return {
date: new Date(d['timestamp']),
close: d['result']['count']
}
});
}
lastQueryTime = new Date(Date.now() - 60 * 1000) // start from one minute ago
lastQueryTime.setUTCMilliseconds(0)
function doQuery() {
now = new Date()
now.setUTCMilliseconds(0)
console.log('query!')
druidQuery(lastQueryTime, now, function(err, results) {
// add results to the data to be shown
lastQueryTime = now
dataToShow = dataToShow.concat(convert(results))
console.log('dataToShow length', dataToShow.length)
})
}
doQuery()
setInterval(doQuery, 10000)
This chart highlights Druid's dual-realtime abilities: rapidly consuming and querying large streams, and we hope it helps illustrate how to use Druid with realtime visualizations!