Merge pull request #500 from metamx/batch-ingestion-fixes

Batch ingestion fixes
diff --git a/docs/content/SelectQuery.md b/docs/content/SelectQuery.md
new file mode 100644
index 0000000..22eb0ca
--- /dev/null
+++ b/docs/content/SelectQuery.md
@@ -0,0 +1,142 @@
+---
+layout: doc_page
+---
+# Select Queries
+Select queries return raw Druid rows and support pagination.
+
+```json
+ {
+   "queryType": "select",
+   "dataSource": "wikipedia",
+   "dimensions":[],
+   "metrics":[],
+   "granularity": "all",
+   "intervals": [
+     "2013-01-01/2013-01-02"
+   ],
+   "pagingSpec":{"pagingIdentifiers": {}, "threshold":5}
+ }
+```
+
+There are several main parts to a select query:
+
+|property|description|required?|
+|--------|-----------|---------|
+|queryType|This String should always be "select"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
+|dataSource|A String defining the data source to query, very similar to a table in a relational database|yes|
+|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
+|dimensions|The list of dimensions to select. If left empty, all dimensions are returned.|no|
+|metrics|The list of metrics to select. If left empty, all metrics are returned.|no|
+|pagingSpec|A JSON object indicating offsets into different scanned segments. Select query results will return a pagingSpec that can be reused for pagination.|yes|
+|context|An additional JSON Object which can be used to specify certain flags.|no|
+
+The format of the result is:
+
+```json
+ [{
+  "timestamp" : "2013-01-01T00:00:00.000Z",
+  "result" : {
+    "pagingIdentifiers" : {
+      "wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9" : 4
+    },
+    "events" : [ {
+      "segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
+      "offset" : 0,
+      "event" : {
+        "timestamp" : "2013-01-01T00:00:00.000Z",
+        "robot" : "1",
+        "namespace" : "article",
+        "anonymous" : "0",
+        "unpatrolled" : "0",
+        "page" : "11._korpus_(NOVJ)",
+        "language" : "sl",
+        "newpage" : "0",
+        "user" : "EmausBot",
+        "count" : 1.0,
+        "added" : 39.0,
+        "delta" : 39.0,
+        "variation" : 39.0,
+        "deleted" : 0.0
+      }
+    }, {
+      "segmentId" : "wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
+      "offset" : 1,
+      "event" : {
+        "timestamp" : "2013-01-01T00:00:00.000Z",
+        "robot" : "0",
+        "namespace" : "article",
+        "anonymous" : "0",
+        "unpatrolled" : "0",
+        "page" : "112_U.S._580",
+        "language" : "en",
+        "newpage" : "1",
+        "user" : "MZMcBride",
+        "count" : 1.0,
+        "added" : 70.0,
+        "delta" : 70.0,
+        "variation" : 70.0,
+        "deleted" : 0.0
+      }
+    }, {
+      "segmentId" : "wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
+      "offset" : 2,
+      "event" : {
+        "timestamp" : "2013-01-01T00:00:00.000Z",
+        "robot" : "0",
+        "namespace" : "article",
+        "anonymous" : "0",
+        "unpatrolled" : "0",
+        "page" : "113_U.S._243",
+        "language" : "en",
+        "newpage" : "1",
+        "user" : "MZMcBride",
+        "count" : 1.0,
+        "added" : 77.0,
+        "delta" : 77.0,
+        "variation" : 77.0,
+        "deleted" : 0.0
+      }
+    }, {
+      "segmentId" : "wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
+      "offset" : 3,
+      "event" : {
+        "timestamp" : "2013-01-01T00:00:00.000Z",
+        "robot" : "0",
+        "namespace" : "article",
+        "anonymous" : "0",
+        "unpatrolled" : "0",
+        "page" : "113_U.S._73",
+        "language" : "en",
+        "newpage" : "1",
+        "user" : "MZMcBride",
+        "count" : 1.0,
+        "added" : 70.0,
+        "delta" : 70.0,
+        "variation" : 70.0,
+        "deleted" : 0.0
+      }
+    }, {
+      "segmentId" : "wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
+      "offset" : 4,
+      "event" : {
+        "timestamp" : "2013-01-01T00:00:00.000Z",
+        "robot" : "0",
+        "namespace" : "article",
+        "anonymous" : "0",
+        "unpatrolled" : "0",
+        "page" : "113_U.S._756",
+        "language" : "en",
+        "newpage" : "1",
+        "user" : "MZMcBride",
+        "count" : 1.0,
+        "added" : 68.0,
+        "delta" : 68.0,
+        "variation" : 68.0,
+        "deleted" : 0.0
+      }
+    } ]
+  }
+} ]
+```
+
+The result returns a global pagingSpec that can be reused for the next select query. The offset will need to be increased by 1 on the client side.
\ No newline at end of file
diff --git a/docs/content/Twitter-Tutorial.textile b/docs/content/Twitter-Tutorial.md
similarity index 78%
rename from docs/content/Twitter-Tutorial.textile
rename to docs/content/Twitter-Tutorial.md
index fb38023..0f21ec8 100644
--- a/docs/content/Twitter-Tutorial.textile
+++ b/docs/content/Twitter-Tutorial.md
@@ -1,77 +1,93 @@
 ---
 layout: doc_page
 ---
-Greetings! We see you've taken an interest in Druid. That's awesome! Hopefully this tutorial will help clarify some core Druid concepts. We will go through one of the Real-time "Examples":Examples.html, and issue some basic Druid queries. The data source we'll be working with is the "Twitter spritzer stream":https://dev.twitter.com/docs/streaming-apis/streams/public. If you are ready to explore Druid, brave its challenges, and maybe learn a thing or two, read on!
+Greetings! We see you've taken an interest in Druid. That's awesome! Hopefully this tutorial will help clarify some core Druid concepts. We will go through one of the Real-time [Examples](Examples.html), and issue some basic Druid queries. The data source we'll be working with is the [Twitter spritzer stream](https://dev.twitter.com/docs/streaming-apis/streams/public). If you are ready to explore Druid, brave its challenges, and maybe learn a thing or two, read on!
 
-h2. Setting Up
+# Setting Up
 
 There are two ways to setup Druid: download a tarball, or build it from source.
 
-h3. Download a Tarball
+# Download a Tarball
 
-We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.98-bin.tar.gz.
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.98-bin.tar.gz).
 Download this bad boy to a directory of your choosing.
 
 You can extract the awesomeness within by issuing:
 
-pre. tar -zxvf druid-services-0.X.X.tar.gz
+```
+tar -zxvf druid-services-0.X.X.tar.gz
+```
 
 Not too lost so far right? That's great! If you cd into the directory:
 
-pre. cd druid-services-0.X.X
+```
+cd druid-services-0.X.X
+```
 
 You should see a bunch of files:
+
 * run_example_server.sh
 * run_example_client.sh
 * LICENSE, config, examples, lib directories
 
-h3. Clone and Build from Source
+# Clone and Build from Source
 
 The other way to setup Druid is from source via git. To do so, run these commands:
 
-<pre><code>git clone git@github.com:metamx/druid.git
+```
+git clone git@github.com:metamx/druid.git
 cd druid
 git checkout druid-0.X.X
 ./build.sh
-</code></pre>
+```
 
 You should see a bunch of files: 
 
-<pre><code>DruidCorporateCLA.pdf	README			common			examples		indexer			pom.xml			server
+```
+DruidCorporateCLA.pdf	README			common			examples		indexer			pom.xml			server
 DruidIndividualCLA.pdf	build.sh		doc			group_by.body		install			publications		services
 LICENSE			client			eclipse_formatting.xml	index-common		merger			realtime
-</code></pre>
+```
 
 You can find the example executables in the examples/bin directory:
+
 * run_example_server.sh
 * run_example_client.sh
 
-h2. Running Example Scripts
+# Running Example Scripts
 
-Let's start doing stuff. You can start a Druid "Realtime":Realtime.html node by issuing:
-<code>./run_example_server.sh</code>
+Let's start doing stuff. You can start a Druid [Realtime](Realtime.html) node by issuing:
+
+```
+./run_example_server.sh
+```
 
 Select "twitter". 
 
-You'll need to register a new application with the twitter API, which only takes a minute. Go to "https://twitter.com/oauth_clients/new":https://twitter.com/oauth_clients/new and fill out the form and submit. Don't worry, the home page and callback url can be anything. This will generate keys for the Twitter example application. Take note of the values for consumer key/secret and access token/secret.
+You'll need to register a new application with the twitter API, which only takes a minute. Go to [this link](https://twitter.com/oauth_clients/new":https://twitter.com/oauth_clients/new) and fill out the form and submit. Don't worry, the home page and callback url can be anything. This will generate keys for the Twitter example application. Take note of the values for consumer key/secret and access token/secret.
 
 Enter your credentials when prompted.
 
 Once the node starts up you will see a bunch of logs about setting up properties and connecting to the data source. If everything was successful, you should see messages of the form shown below. If you see crazy exceptions, you probably typed in your login information incorrectly.
-<pre><code>2013-05-17 23:04:40,934 INFO [main] org.mortbay.log - Started SelectChannelConnector@0.0.0.0:8080
+
+```
+2013-05-17 23:04:40,934 INFO [main] org.mortbay.log - Started SelectChannelConnector@0.0.0.0:8080
 2013-05-17 23:04:40,935 INFO [main] com.metamx.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking start method[public void com.metamx.druid.http.FileRequestLogger.start()] on object[com.metamx.druid.http.FileRequestLogger@42bb0406].
 2013-05-17 23:04:41,578 INFO [Twitter Stream consumer-1[Establishing connection]] twitter4j.TwitterStreamImpl - Connection established.
 2013-05-17 23:04:41,578 INFO [Twitter Stream consumer-1[Establishing connection]] io.druid.examples.twitter.TwitterSpritzerFirehoseFactory - Connected_to_Twitter
 2013-05-17 23:04:41,578 INFO [Twitter Stream consumer-1[Establishing connection]] twitter4j.TwitterStreamImpl - Receiving status stream.
-</code></pre>
+```
 
 Periodically, you'll also see messages of the form:
-<pre><code>2013-05-17 23:04:59,793 INFO [chief-twitterstream] io.druid.examples.twitter.TwitterSpritzerFirehoseFactory - nextRow() has returned 1,000 InputRows
-</code></pre>
+
+```
+2013-05-17 23:04:59,793 INFO [chief-twitterstream] io.druid.examples.twitter.TwitterSpritzerFirehoseFactory - nextRow() has returned 1,000 InputRows
+```
 
 These messages indicate you are ingesting events. The Druid real time-node ingests events in an in-memory buffer. Periodically, these events will be persisted to disk. Persisting to disk generates a whole bunch of logs:
 
-<pre><code>2013-05-17 23:06:40,918 INFO [chief-twitterstream] com.metamx.druid.realtime.plumber.RealtimePlumberSchool - Submitting persist runnable for dataSource[twitterstream]
+```
+2013-05-17 23:06:40,918 INFO [chief-twitterstream] com.metamx.druid.realtime.plumber.RealtimePlumberSchool - Submitting persist runnable for dataSource[twitterstream]
 2013-05-17 23:06:40,920 INFO [twitterstream-incremental-persist] com.metamx.druid.realtime.plumber.RealtimePlumberSchool - DataSource[twitterstream], Interval[2013-05-17T23:00:00.000Z/2013-05-18T00:00:00.000Z], persisting Hydrant[FireHydrant{index=com.metamx.druid.index.v1.IncrementalIndex@126212dd, queryable=com.metamx.druid.index.IncrementalIndexSegment@64c47498, count=0}]
 2013-05-17 23:06:40,937 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - Starting persist for interval[2013-05-17T23:00:00.000Z/2013-05-17T23:07:00.000Z], rows[4,666]
 2013-05-17 23:06:41,039 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexMerger - outDir[/tmp/example/twitter_realtime/basePersist/twitterstream/2013-05-17T23:00:00.000Z_2013-05-18T00:00:00.000Z/0/v8-tmp] completed index.drd in 11 millis.
@@ -88,16 +104,20 @@
 2013-05-17 23:06:41,425 INFO [twitterstream-incremental-persist] com.metamx.druid.index.v1.IndexIO$DefaultIndexIOHandler - Converting v8[/tmp/example/twitter_realtime/basePersist/twitterstream/2013-05-17T23:00:00.000Z_2013-05-18T00:00:00.000Z/0/v8-tmp] to v9[/tmp/example/twitter_realtime/basePersist/twitterstream/2013-05-17T23:00:00.000Z_2013-05-18T00:00:00.000Z/0]
 2013-05-17 23:06:41,426 INFO [twitterstream-incremental-persist] 
 ... ETC
-</code></pre>
+```
 
 The logs are about building different columns, probably not the most exciting stuff (they might as well be in Vulcan) if are you learning about Druid for the first time. Nevertheless, if you are interested in the details of our real-time architecture and why we persist indexes to disk, I suggest you read our "White Paper":http://static.druid.io/docs/druid.pdf.
 
 Okay, things are about to get real (-time). To query the real-time node you've spun up, you can issue:
-<pre>./run_example_client.sh</pre>
 
-Select "twitter" once again. This script issues ["GroupByQuery":GroupByQuery.html]s to the twitter data we've been ingesting. The query looks like this:
+```
+./run_example_client.sh
+```
 
-<pre><code>{
+Select "twitter" once again. This script issues [GroupByQueries](GroupByQuery.html) to the twitter data we've been ingesting. The query looks like this:
+
+```json
+{
     "queryType": "groupBy",
     "dataSource": "twitterstream",
     "granularity": "all",
@@ -109,13 +129,14 @@
     "filter": { "type": "selector", "dimension": "lang", "value": "en" },
     "intervals":["2012-10-01T00:00/2020-01-01T00"]
 }
-</code></pre>
+```
 
 This is a **groupBy** query, which you may be familiar with from SQL. We are grouping, or aggregating, via the **dimensions** field: ["lang", "utc_offset"]. We are **filtering** via the **"lang"** dimension, to only look at english tweets. Our **aggregations** are what we are calculating: a row count, and the sum of the tweets in our data.
 
 The result looks something like this:
 
-<pre><code>[
+```json
+[
     {
         "version": "v1",
         "timestamp": "2012-10-01T00:00:00.000Z",
@@ -137,41 +158,48 @@
         }
     },
 ...
-</code></pre>
+```
 
 This data, plotted in a time series/distribution, looks something like this:
 
-!http://metamarkets.com/wp-content/uploads/2013/06/tweets_timezone_offset.png(Timezone / Tweets Scatter Plot)!
+![Tweets](http://metamarkets.com/wp-content/uploads/2013/06/tweets_timezone_offset.png)
 
-This groupBy query is a bit complicated and we'll return to it later. For the time being, just make sure you are getting some blocks of data back. If you are having problems, make sure you have "curl":http://curl.haxx.se/ installed. Control+C to break out of the client script.
+This groupBy query is a bit complicated and we'll return to it later. For the time being, just make sure you are getting some blocks of data back. If you are having problems, make sure you have [curl](http://curl.haxx.se/) installed. Control+C to break out of the client script.
 
-h2. Querying Druid
+# Querying Druid
 
 In your favorite editor, create the file:
-<pre>time_boundary_query.body</pre>
+
+```
+time_boundary_query.body
+```
 
 Druid queries are JSON blobs which are relatively painless to create programmatically, but an absolute pain to write by hand. So anyway, we are going to create a Druid query by hand. Add the following to the file you just created:
-<pre><code>{

+
+```json
+{

   "queryType"  : "timeBoundary",
   "dataSource" : "twitterstream"
 }
-</code></pre>
+```
 
 The "TimeBoundaryQuery":TimeBoundaryQuery.html is one of the simplest Druid queries. To run the query, you can issue:
-<pre><code> 
+
+```
 curl -X POST 'http://localhost:8080/druid/v2/?pretty' -H 'content-type: application/json'  -d @time_boundary_query.body
-</code></pre>
+```
 
 We get something like this JSON back:
 
-<pre><code>[ {
+```json
+{
   "timestamp" : "2013-06-10T19:09:00.000Z",
   "result" : {
     "minTime" : "2013-06-10T19:09:00.000Z",
     "maxTime" : "2013-06-10T20:50:00.000Z"
   }
 } ]
-</code></pre>
+```
 
 That's the result. What information do you think the result is conveying? 
 ...
@@ -179,11 +207,14 @@
 
 Return to your favorite editor and create the file:
 
-<pre>timeseries_query.body</pre>
+```
+timeseries_query.body
+```
 
-We are going to make a slightly more complicated query, the "TimeseriesQuery":TimeseriesQuery.html. Copy and paste the following into the file:
+We are going to make a slightly more complicated query, the [TimeseriesQuery](TimeseriesQuery.html). Copy and paste the following into the file:
 
-<pre><code>{
+```json
+{
   "queryType":"timeseries",
   "dataSource":"twitterstream",
   "intervals":["2010-01-01/2020-01-01"],
@@ -193,22 +224,26 @@
       { "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}
   ]
 }
-</code></pre>
+```
 
-You are probably wondering, what are these "Granularities":Granularities.html and "Aggregations":Aggregations.html things? What the query is doing is aggregating some metrics over some span of time. 
+You are probably wondering, what are these [Granularities](Granularities.html) and [Aggregations](Aggregations.html) things? What the query is doing is aggregating some metrics over some span of time.
 To issue the query and get some results, run the following in your command line:
-<pre><code>curl -X POST 'http://localhost:8080/druid/v2/?pretty' -H 'content-type: application/json'  -d @timeseries_query.body</code></pre>
+
+```
+curl -X POST 'http://localhost:8080/druid/v2/?pretty' -H 'content-type: application/json'  -d @timeseries_query.body
+```
 
 Once again, you should get a JSON blob of text back with your results, that looks something like this:
 
-<pre><code>[ {
+```json
+[ {
   "timestamp" : "2013-06-10T19:09:00.000Z",
   "result" : {
     "tweets" : 358562.0,
     "rows" : 272271
   }
 } ]
-</code></pre>
+```
 
 If you issue the query again, you should notice your results updating.
 
@@ -216,7 +251,8 @@
 
 If you loudly exclaimed "we can change granularity to minute", you are absolutely correct again! We can specify different granularities to bucket our results, like so:
 
-<pre><code>{
+```json
+{
   "queryType":"timeseries",
   "dataSource":"twitterstream",
   "intervals":["2010-01-01/2020-01-01"],
@@ -226,11 +262,12 @@
       { "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}
   ]
 }
-</code></pre>
+```
 
 This gives us something like the following:
 
-<pre><code>[ {
+```json
+[ {
   "timestamp" : "2013-06-10T19:09:00.000Z",
   "result" : {
     "tweets" : 2650.0,
@@ -250,16 +287,21 @@
   }
 },
 ...
-</code></pre>
+```
 
-h2. Solving a Problem
+# Solving a Problem
 
 One of Druid's main powers (see what we did there?) is to provide answers to problems, so let's pose a problem. What if we wanted to know what the top hash tags are, ordered by the number tweets, where the language is english, over the last few minutes you've been reading this tutorial? To solve this problem, we have to return to the query we introduced at the very beginning of this tutorial, the "GroupByQuery":GroupByQuery.html. It would be nice if we could group by results by dimension value and somehow sort those results... and it turns out we can! 
 
 Let's create the file:
-<pre>group_by_query.body</pre>
+
+```
+group_by_query.body
+```
 and put the following in there:
-<pre><code>{
+
+```json
+{
     "queryType": "groupBy",
     "dataSource": "twitterstream",
     "granularity": "all",
@@ -271,16 +313,20 @@
     "filter": {"type": "selector", "dimension": "lang", "value": "en" },
     "intervals":["2012-10-01T00:00/2020-01-01T00"]
 }
-</code></pre>
+```
 
 Woah! Our query just got a way more complicated. Now we have these "Filters":Filters.html things and this "OrderBy":OrderBy.html thing. Fear not, it turns out the new objects we've introduced to our query can help define the format of our results and provide an answer to our question.
 
 If you issue the query:
-<pre><code>curl -X POST 'http://localhost:8080/druid/v2/?pretty' -H 'content-type: application/json'  -d @group_by_query.body</code></pre>
+
+```
+curl -X POST 'http://localhost:8080/druid/v2/?pretty' -H 'content-type: application/json'  -d @group_by_query.body
+```
 
 You should hopefully see an answer to our question. For my twitter stream, it looks like this:
 
-<pre><code>[ {
+```json
+[ {
   "version" : "v1",
   "timestamp" : "2012-10-01T00:00:00.000Z",
   "event" : {
@@ -316,12 +362,12 @@
     "htags" : "IDidntTextYouBackBecause"
   }
 } ]
-</code></pre>
+```
 
 Feel free to tweak other query parameters to answer other questions you may have about the data.
 
-h2. Additional Information
+# Additional Information
 
-This tutorial is merely showcasing a small fraction of what Druid can do. Next, continue on to "The Druid Cluster":./Tutorial:-The-Druid-Cluster.html.
+This tutorial is merely showcasing a small fraction of what Druid can do. Next, continue on to [The Druid Cluster](./Tutorial:-The-Druid-Cluster.html).
 
-And thus concludes our journey! Hopefully you learned a thing or two about Druid real-time ingestion, querying Druid, and how Druid can be used to solve problems. If you have additional questions, feel free to post in our "google groups page":http://www.groups.google.com/forum/#!forum/druid-development.
+And thus concludes our journey! Hopefully you learned a thing or two about Druid real-time ingestion, querying Druid, and how Druid can be used to solve problems. If you have additional questions, feel free to post in our [google groups page](http://www.groups.google.com/forum/#!forum/druid-development).
diff --git a/docs/content/toc.textile b/docs/content/toc.textile
index c17a753..98395fc 100644
--- a/docs/content/toc.textile
+++ b/docs/content/toc.textile
@@ -75,6 +75,7 @@
 h2. Experimental
 * "About Experimental Features":./About-Experimental-Features.html
 * "Geographic Queries":./GeographicQueries.html
+* "Select Query":./SelectQuery.html
 
 h2. Development
 * "Versioning":./Versioning.html
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
index 347fa85..a238d5e 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DbUpdaterJob.java
@@ -40,13 +40,15 @@
 
   private final HadoopDruidIndexerConfig config;
   private final IDBI dbi;
+  private final DbConnector dbConnector;
 
   public DbUpdaterJob(
       HadoopDruidIndexerConfig config
   )
   {
     this.config = config;
-    this.dbi = new DbConnector(config.getUpdaterJobSpec(), null).getDBI();
+    this.dbConnector = new DbConnector(config.getUpdaterJobSpec(), null);
+    this.dbi = this.dbConnector.getDBI();
   }
 
   @Override
@@ -62,8 +64,11 @@
           {
             final PreparedBatch batch = handle.prepareBatch(
                 String.format(
-                    "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
-                    + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
+                    dbConnector.isPostgreSQL() ?
+                      "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+                      + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)" :
+                      "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+                      + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
                     config.getUpdaterJobSpec().getSegmentTable()
                 )
             );
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 7e9d403..22529c6 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -66,7 +66,8 @@
     extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class);
   }
 
-  private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-client:2.3.0";
+  public static String DEFAULT_HADOOP_COORDINATES = "org.apache.hadoop:hadoop-client:2.3.0";
+
   @JsonIgnore
   private final HadoopDruidIndexerSchema schema;
   @JsonIgnore
@@ -102,7 +103,7 @@
 
     this.schema = schema;
     this.hadoopDependencyCoordinates = hadoopDependencyCoordinates == null ? Arrays.<String>asList(
-        hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates
+        hadoopCoordinates == null ? DEFAULT_HADOOP_COORDINATES : hadoopCoordinates
     ) : hadoopDependencyCoordinates;
   }
 
diff --git a/pom.xml b/pom.xml
index 74c189d..c0c9f69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,7 +74,7 @@
             <dependency>
                 <groupId>com.metamx</groupId>
                 <artifactId>emitter</artifactId>
-                <version>0.2.9</version>
+                <version>0.2.11</version>
             </dependency>
             <dependency>
                 <groupId>com.metamx</groupId>
diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
index 7762764..4dbb75b 100644
--- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java
@@ -35,8 +35,10 @@
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 /**
@@ -110,7 +112,11 @@
                                   if (input == null) {
                                     throw new ISE("Input is null?! How is this possible?!");
                                   }
-                                  return Sequences.toList(input.run(query), Lists.<T>newArrayList());
+                                  Sequence<T> result = input.run(query);
+                                  if (result == null) {
+                                    throw new ISE("Got a null result! Segments are missing!");
+                                  }
+                                  return Sequences.toList(result, Lists.<T>newArrayList());
                                 }
                                 catch (Exception e) {
                                   log.error(e, "Exception with one of the sequences!");
diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java
index ebd4e18..c7f3eba 100644
--- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java
@@ -88,6 +88,10 @@
   @Override
   public Object deserialize(Object object)
   {
+    // handle "NaN" / "Infinity" values serialized as strings in JSON
+    if (object instanceof String) {
+      return Double.parseDouble((String) object);
+    }
     return object;
   }
 
diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java
index 6de6be0..85bd959 100644
--- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java
@@ -139,6 +139,10 @@
   @Override
   public Object deserialize(Object object)
   {
+    // handle "NaN" / "Infinity" values serialized as strings in JSON
+    if (object instanceof String) {
+      return Double.parseDouble((String) object);
+    }
     return object;
   }
 
diff --git a/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java
index ee8217f..b731c43 100644
--- a/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/MaxAggregatorFactory.java
@@ -85,6 +85,7 @@
   @Override
   public Object deserialize(Object object)
   {
+    // handle "NaN" / "Infinity" values serialized as strings in JSON
     if (object instanceof String) {
       return Double.parseDouble((String) object);
     }
diff --git a/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java
index 9c3d560..d3956c9 100644
--- a/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/MinAggregatorFactory.java
@@ -85,6 +85,7 @@
   @Override
   public Object deserialize(Object object)
   {
+    // handle "NaN" / "Infinity" values serialized as strings in JSON
     if (object instanceof String) {
       return Double.parseDouble((String) object);
     }
diff --git a/processing/src/main/java/io/druid/query/filter/JavaScriptDimFilter.java b/processing/src/main/java/io/druid/query/filter/JavaScriptDimFilter.java
index b9cef36..1f87405 100644
--- a/processing/src/main/java/io/druid/query/filter/JavaScriptDimFilter.java
+++ b/processing/src/main/java/io/druid/query/filter/JavaScriptDimFilter.java
@@ -67,4 +67,13 @@
         .put(functionBytes)
         .array();
   }
+
+  @Override
+  public String toString()
+  {
+    return "JavaScriptDimFilter{" +
+           "dimension='" + dimension + '\'' +
+           ", function='" + function + '\'' +
+           '}';
+  }
 }
diff --git a/processing/src/main/java/io/druid/query/filter/RegexDimFilter.java b/processing/src/main/java/io/druid/query/filter/RegexDimFilter.java
index 0644250..9a327dd 100644
--- a/processing/src/main/java/io/druid/query/filter/RegexDimFilter.java
+++ b/processing/src/main/java/io/druid/query/filter/RegexDimFilter.java
@@ -69,4 +69,13 @@
         .put(patternBytes)
         .array();
   }
+
+  @Override
+  public String toString()
+  {
+    return "RegexDimFilter{" +
+           "dimension='" + dimension + '\'' +
+           ", pattern='" + pattern + '\'' +
+           '}';
+  }
 }
diff --git a/processing/src/main/java/io/druid/query/filter/SearchQueryDimFilter.java b/processing/src/main/java/io/druid/query/filter/SearchQueryDimFilter.java
index 76c5ecd..c6d09c0 100644
--- a/processing/src/main/java/io/druid/query/filter/SearchQueryDimFilter.java
+++ b/processing/src/main/java/io/druid/query/filter/SearchQueryDimFilter.java
@@ -64,9 +64,18 @@
     final byte[] queryBytes = query.getCacheKey();
 
     return ByteBuffer.allocate(1 + dimensionBytes.length + queryBytes.length)
-        .put(DimFilterCacheHelper.SEARCH_QUERY_TYPE_ID)
-        .put(dimensionBytes)
-        .put(queryBytes)
-        .array();
+                     .put(DimFilterCacheHelper.SEARCH_QUERY_TYPE_ID)
+                     .put(dimensionBytes)
+                     .put(queryBytes)
+                     .array();
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SearchQueryDimFilter{" +
+           "dimension='" + dimension + '\'' +
+           ", query=" + query +
+           '}';
   }
 }
diff --git a/processing/src/main/java/io/druid/query/filter/SpatialDimFilter.java b/processing/src/main/java/io/druid/query/filter/SpatialDimFilter.java
index 6899d30..2abcc92 100644
--- a/processing/src/main/java/io/druid/query/filter/SpatialDimFilter.java
+++ b/processing/src/main/java/io/druid/query/filter/SpatialDimFilter.java
@@ -99,4 +99,13 @@
     result = 31 * result + (bound != null ? bound.hashCode() : 0);
     return result;
   }
+
+  @Override
+  public String toString()
+  {
+    return "SpatialDimFilter{" +
+           "dimension='" + dimension + '\'' +
+           ", bound=" + bound +
+           '}';
+  }
 }
diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java
index 888a034..1c75f13 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java
@@ -70,7 +70,7 @@
   private final StupidPool<ByteBuffer> intermediateResultsBufferPool;
 
   @Inject
-  public GroupByQueryEngine (
+  public GroupByQueryEngine(
       Supplier<GroupByQueryConfig> config,
       @Global StupidPool<ByteBuffer> intermediateResultsBufferPool
   )
@@ -81,6 +81,12 @@
 
   public Sequence<Row> process(final GroupByQuery query, StorageAdapter storageAdapter)
   {
+    if (storageAdapter == null) {
+      throw new ISE(
+          "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
+      );
+    }
+
     final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
     if (intervals.size() != 1) {
       throw new IAE("Should only have one interval, got[%s]", intervals);
@@ -187,8 +193,7 @@
           ByteBuffer newKey = key.duplicate();
           newKey.putInt(dimSelector.getValueCardinality());
           unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size()));
-        }
-        else {
+        } else {
           for (Integer dimValue : row) {
             ByteBuffer newKey = key.duplicate();
             newKey.putInt(dimValue);
@@ -202,8 +207,7 @@
           retVal.addAll(unaggregatedBuffers);
         }
         return retVal;
-      }
-      else {
+      } else {
         key.clear();
         Integer position = positions.get(key);
         int[] increments = positionMaintainer.getIncrements();
@@ -267,8 +271,7 @@
     {
       if (nextVal > max) {
         return null;
-      }
-      else {
+      } else {
         int retVal = (int) nextVal;
         nextVal += increment;
         return retVal;
@@ -402,7 +405,7 @@
                     final DimExtractionFn fn = dimensionSpecs.get(i).getDimExtractionFn();
                     final int dimVal = keyBuffer.getInt();
                     if (dimSelector.getValueCardinality() != dimVal) {
-                      if(fn != null) {
+                      if (fn != null) {
                         theEvent.put(dimNames.get(i), fn.apply(dimSelector.lookupName(dimVal)));
                       } else {
                         theEvent.put(dimNames.get(i), dimSelector.lookupName(dimVal));
@@ -434,9 +437,10 @@
       throw new UnsupportedOperationException();
     }
 
-    public void close() {
+    public void close()
+    {
       // cleanup
-      for(BufferAggregator agg : aggregators) {
+      for (BufferAggregator agg : aggregators) {
         agg.close();
       }
     }
diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
index d600b2b..c5b64c2 100644
--- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
+++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
@@ -104,47 +104,47 @@
   )
   {
     return new ConcatQueryRunner<SegmentAnalysis>(
-            Sequences.map(
-                Sequences.simple(queryRunners),
-                new Function<QueryRunner<SegmentAnalysis>, QueryRunner<SegmentAnalysis>>()
+        Sequences.map(
+            Sequences.simple(queryRunners),
+            new Function<QueryRunner<SegmentAnalysis>, QueryRunner<SegmentAnalysis>>()
+            {
+              @Override
+              public QueryRunner<SegmentAnalysis> apply(final QueryRunner<SegmentAnalysis> input)
+              {
+                return new QueryRunner<SegmentAnalysis>()
                 {
                   @Override
-                  public QueryRunner<SegmentAnalysis> apply(final QueryRunner<SegmentAnalysis> input)
+                  public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
                   {
-                    return new QueryRunner<SegmentAnalysis>()
-                    {
-                      @Override
-                      public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
-                      {
 
-                        Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
-                            new Callable<Sequence<SegmentAnalysis>>()
-                            {
-                              @Override
-                              public Sequence<SegmentAnalysis> call() throws Exception
-                              {
-                                return new ExecutorExecutingSequence<SegmentAnalysis>(
-                                    input.run(query),
-                                    queryExecutor
-                                );
-                              }
-                            }
-                        );
-                        try {
-                          return future.get();
+                    Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
+                        new Callable<Sequence<SegmentAnalysis>>()
+                        {
+                          @Override
+                          public Sequence<SegmentAnalysis> call() throws Exception
+                          {
+                            return new ExecutorExecutingSequence<SegmentAnalysis>(
+                                input.run(query),
+                                queryExecutor
+                            );
+                          }
                         }
-                        catch (InterruptedException e) {
-                          throw Throwables.propagate(e);
-                        }
-                        catch (ExecutionException e) {
-                          throw Throwables.propagate(e);
-                        }
-                      }
-                    };
+                    );
+                    try {
+                      return future.get();
+                    }
+                    catch (InterruptedException e) {
+                      throw Throwables.propagate(e);
+                    }
+                    catch (ExecutionException e) {
+                      throw Throwables.propagate(e);
+                    }
                   }
-                }
-            )
-        );
+                };
+              }
+            }
+        )
+    );
   }
 
   @Override
diff --git a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java
index 098a546..c6d6ecc 100644
--- a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java
+++ b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java
@@ -32,7 +32,6 @@
 
 public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
 {
-
   private final ColumnIncluderator toInclude;
   private final boolean merge;
 
diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java
index 6654762..4070a27 100644
--- a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java
@@ -55,7 +55,7 @@
 import java.util.TreeSet;
 
 /**
-*/
+ */
 public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
 {
   private static final EmittingLogger log = new EmittingLogger(SearchQueryRunner.class);
@@ -99,12 +99,10 @@
           ConciseSet set = new ConciseSet();
           set.add(0);
           baseFilter = ImmutableConciseSet.newImmutableFromMutable(set);
-        }
-        else {
+        } else {
           baseFilter = ImmutableConciseSet.complement(new ImmutableConciseSet(), index.getNumRows());
         }
-      }
-      else {
+      } else {
         baseFilter = filter.goConcise(new ColumnSelectorBitmapIndexSelector(index));
       }
 
@@ -133,49 +131,52 @@
     }
 
     final StorageAdapter adapter = segment.asStorageAdapter();
-    if (adapter != null) {
-      Iterable<String> dimsToSearch;
-      if (dimensions == null || dimensions.isEmpty()) {
-        dimsToSearch = adapter.getAvailableDimensions();
-      } else {
-        dimsToSearch = dimensions;
+
+    if (adapter == null) {
+      log.makeAlert("WTF!? Unable to process search query on segment.")
+         .addData("segment", segment.getIdentifier())
+         .addData("query", query).emit();
+      throw new ISE(
+          "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
+      );
+    }
+
+    Iterable<String> dimsToSearch;
+    if (dimensions == null || dimensions.isEmpty()) {
+      dimsToSearch = adapter.getAvailableDimensions();
+    } else {
+      dimsToSearch = dimensions;
+    }
+
+    final TreeSet<SearchHit> retVal = Sets.newTreeSet(query.getSort().getComparator());
+
+    final Iterable<Cursor> cursors = adapter.makeCursors(filter, segment.getDataInterval(), QueryGranularity.ALL);
+    for (Cursor cursor : cursors) {
+      Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
+      for (String dim : dimsToSearch) {
+        dimSelectors.put(dim, cursor.makeDimensionSelector(dim));
       }
 
-      final TreeSet<SearchHit> retVal = Sets.newTreeSet(query.getSort().getComparator());
-
-      final Iterable<Cursor> cursors = adapter.makeCursors(filter, segment.getDataInterval(), QueryGranularity.ALL);
-      for (Cursor cursor : cursors) {
-        Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
-        for (String dim : dimsToSearch) {
-          dimSelectors.put(dim, cursor.makeDimensionSelector(dim));
-        }
-
-        while (!cursor.isDone()) {
-          for (Map.Entry<String, DimensionSelector> entry : dimSelectors.entrySet()) {
-            final DimensionSelector selector = entry.getValue();
-            final IndexedInts vals = selector.getRow();
-            for (int i = 0; i < vals.size(); ++i) {
-              final String dimVal = selector.lookupName(vals.get(i));
-              if (searchQuerySpec.accept(dimVal)) {
-                retVal.add(new SearchHit(entry.getKey(), dimVal));
-                if (retVal.size() >= limit) {
-                  return makeReturnResult(limit, retVal);
-                }
+      while (!cursor.isDone()) {
+        for (Map.Entry<String, DimensionSelector> entry : dimSelectors.entrySet()) {
+          final DimensionSelector selector = entry.getValue();
+          final IndexedInts vals = selector.getRow();
+          for (int i = 0; i < vals.size(); ++i) {
+            final String dimVal = selector.lookupName(vals.get(i));
+            if (searchQuerySpec.accept(dimVal)) {
+              retVal.add(new SearchHit(entry.getKey(), dimVal));
+              if (retVal.size() >= limit) {
+                return makeReturnResult(limit, retVal);
               }
             }
           }
-
-          cursor.advance();
         }
-      }
 
-      return makeReturnResult(limit, retVal);
+        cursor.advance();
+      }
     }
 
-    log.makeAlert("WTF!? Unable to process search query on segment.")
-       .addData("segment", segment.getIdentifier())
-       .addData("query", query);
-    return Sequences.empty();
+    return makeReturnResult(limit, retVal);
   }
 
   private Sequence<Result<SearchResultValue>> makeReturnResult(int limit, TreeSet<SearchHit> retVal)
diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java
index 3238ac0..f5bc7ab 100644
--- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java
+++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java
@@ -22,6 +22,7 @@
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.metamx.common.ISE;
 import com.metamx.common.guava.BaseSequence;
 import com.metamx.common.guava.Sequence;
 import io.druid.query.QueryRunnerHelper;
@@ -54,6 +55,12 @@
           {
             final StorageAdapter adapter = segment.asStorageAdapter();
 
+            if (adapter == null) {
+              throw new ISE(
+                  "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
+              );
+            }
+
             final Iterable<String> dims;
             if (query.getDimensions() == null || query.getDimensions().isEmpty()) {
               dims = adapter.getAvailableDimensions();
diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
index cd732de..16e9ae8 100644
--- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
+++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
@@ -87,6 +87,12 @@
             @Override
             public Iterator<Result<TimeBoundaryResultValue>> make()
             {
+              if (adapter == null) {
+                throw new ISE(
+                    "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
+                );
+              }
+
               return legacyQuery.buildResult(
                   adapter.getInterval().getStart(),
                   adapter.getMinTime(),
diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java
index 3062b66..6ab4247 100644
--- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java
+++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java
@@ -20,6 +20,7 @@
 package io.druid.query.timeseries;
 
 import com.google.common.base.Function;
+import com.metamx.common.ISE;
 import com.metamx.common.guava.BaseSequence;
 import com.metamx.common.guava.Sequence;
 import io.druid.query.QueryRunnerHelper;
@@ -40,6 +41,12 @@
 {
   public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter)
   {
+    if (adapter == null) {
+      throw new ISE(
+          "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
+      );
+    }
+
     return new BaseSequence<Result<TimeseriesResultValue>, Iterator<Result<TimeseriesResultValue>>>(
         new BaseSequence.IteratorMaker<Result<TimeseriesResultValue>, Iterator<Result<TimeseriesResultValue>>>()
         {
diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 6fe5515..482e92a 100644
--- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -265,8 +265,12 @@
         final Map<String, Object> values = Maps.newHashMap();
         final TimeseriesResultValue holder = result.getValue();
         if (calculatePostAggs) {
+          // put non finalized aggregators for calculating dependent post Aggregators
+          for (AggregatorFactory agg : query.getAggregatorSpecs()) {
+            values.put(agg.getName(), holder.getMetric(agg.getName()));
+          }
           for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
-            values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject()));
+            values.put(postAgg.getName(), postAgg.compute(values));
           }
         }
         for (AggregatorFactory agg : query.getAggregatorSpecs()) {
diff --git a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java
index e2a9284..4c02da4 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java
@@ -90,19 +90,7 @@
     TopNResultValue arg2Vals = arg2.getValue();
 
     for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) {
-      final String dimensionValue = arg1Val.getStringDimensionValue(dimension);
-      Map<String, Object> retVal = new LinkedHashMap<String, Object>(aggregations.size() + 2);
-      retVal.put(dimension, dimensionValue);
-
-      for (AggregatorFactory factory : aggregations) {
-        final String metricName = factory.getName();
-        retVal.put(metricName, arg1Val.getMetric(metricName));
-      }
-      for (PostAggregator postAgg : postAggregations) {
-        retVal.put(postAgg.getName(), postAgg.compute(retVal));
-      }
-
-      retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));
+      retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val);
     }
     for (DimensionAndMetricValueExtractor arg2Val : arg2Vals) {
       final String dimensionValue = arg2Val.getStringDimensionValue(dimension);
@@ -124,18 +112,7 @@
 
         retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));
       } else {
-        Map<String, Object> retVal = new LinkedHashMap<String, Object>(aggregations.size() + 2);
-        retVal.put(dimension, dimensionValue);
-
-        for (AggregatorFactory factory : aggregations) {
-          final String metricName = factory.getName();
-          retVal.put(metricName, arg2Val.getMetric(metricName));
-        }
-        for (PostAggregator postAgg : postAggregations) {
-          retVal.put(postAgg.getName(), postAgg.compute(retVal));
-        }
-
-        retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));
+        retVals.put(dimensionValue, arg2Val);
       }
     }
 
diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java
index 09a158b..1f3a889 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java
@@ -21,7 +21,7 @@
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import com.metamx.common.ISE;
 import com.metamx.common.guava.FunctionalIterable;
 import com.metamx.common.logger.Logger;
 import io.druid.collections.StupidPool;
@@ -53,6 +53,12 @@
 
   public Iterable<Result<TopNResultValue>> query(final TopNQuery query, final StorageAdapter adapter)
   {
+    if (adapter == null) {
+      throw new ISE(
+          "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
+      );
+    }
+
     final List<Interval> queryIntervals = query.getQuerySegmentSpec().getIntervals();
     final Filter filter = Filters.convertDimensionFilters(query.getDimensionsFilter());
     final QueryGranularity granularity = query.getGranularity();
@@ -62,10 +68,6 @@
         queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals
     );
 
-    if (mapFn == null) {
-      return Lists.newArrayList();
-    }
-
     return FunctionalIterable
         .create(adapter.makeCursors(filter, queryIntervals.get(0), granularity))
         .transform(
@@ -84,13 +86,6 @@
 
   private Function<Cursor, Result<TopNResultValue>> getMapFn(TopNQuery query, final StorageAdapter adapter)
   {
-    if (adapter == null) {
-      log.warn(
-          "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped. Returning empty results."
-      );
-      return null;
-    }
-
     final Capabilities capabilities = adapter.getCapabilities();
     final int cardinality = adapter.getDimensionCardinality(query.getDimensionSpec().getDimension());
     int numBytesPerRecord = 0;
diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java
index f290fc2..5db416f 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java
@@ -208,13 +208,17 @@
                   public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
                   {
                     final Map<String, Object> values = Maps.newHashMap();
-                    // compute all post aggs
+                    // put non finalized aggregators for calculating dependent post Aggregators
+                    for (AggregatorFactory agg : query.getAggregatorSpecs()) {
+                      values.put(agg.getName(), input.getMetric(agg.getName()));
+                    }
+
                     for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
                       Object calculatedPostAgg = input.getMetric(postAgg.getName());
                       if (calculatedPostAgg != null) {
                         values.put(postAgg.getName(), calculatedPostAgg);
                       } else {
-                        values.put(postAgg.getName(), postAgg.compute(input.getBaseObject()));
+                        values.put(postAgg.getName(), postAgg.compute(values));
                       }
                     }
                     for (AggregatorFactory agg : query.getAggregatorSpecs()) {
@@ -249,6 +253,11 @@
     return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>()
     {
       private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
+      private final List<PostAggregator> postAggs = AggregatorUtil.pruneDependentPostAgg(
+          query.getPostAggregatorSpecs(),
+          query.getTopNMetricSpec()
+               .getMetricName(query.getDimensionSpec())
+      );
 
       @Override
       public byte[] computeCacheKey(TopNQuery query)
@@ -338,6 +347,10 @@
                 vals.put(factory.getName(), factory.deserialize(resultIter.next()));
               }
 
+              for (PostAggregator postAgg : postAggs) {
+                vals.put(postAgg.getName(), postAgg.compute(vals));
+              }
+
               retVal.add(vals);
             }
 
diff --git a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java
index 1caf761..c8958dc 100644
--- a/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java
+++ b/processing/src/test/java/io/druid/query/topn/TopNBinaryFnTest.java
@@ -294,17 +294,20 @@
                 ImmutableMap.<String, Object>of(
                     "rows", 1L,
                     "index", 2L,
-                    "testdim", "1"
+                    "testdim", "1",
+                    "addrowsindexconstant", 3.0
                 ),
                 ImmutableMap.<String, Object>of(
                     "rows", 2L,
                     "index", 4L,
-                    "testdim", "2"
+                    "testdim", "2",
+                    "addrowsindexconstant", 7.0
                 ),
                 ImmutableMap.<String, Object>of(
                     "rows", 0L,
                     "index", 2L,
-                    "testdim", "3"
+                    "testdim", "3",
+                    "addrowsindexconstant", 3.0
                 )
             )
         )
@@ -316,17 +319,20 @@
                 ImmutableMap.<String, Object>of(
                     "rows", 2L,
                     "index", 3L,
-                    "testdim", "1"
+                    "testdim", "1",
+                    "addrowsindexconstant", 6.0
                 ),
                 ImmutableMap.<String, Object>of(
                     "rows", 2L,
                     "index", 0L,
-                    "testdim", "2"
+                    "testdim", "2",
+                    "addrowsindexconstant", 3.0
                 ),
                 ImmutableMap.<String, Object>of(
                     "rows", 4L,
                     "index", 5L,
-                    "testdim", "other"
+                    "testdim", "other",
+                    "addrowsindexconstant", 10.0
                 )
             )
         )
@@ -434,7 +440,8 @@
                 ImmutableMap.<String, Object>of(
                     "testdim", "2",
                     "rows", 4L,
-                    "index", 4L                )
+                    "index", 4L
+                )
             )
         )
     );
diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java
index ea00204..77d0c1b 100644
--- a/server/src/main/java/io/druid/server/QueryResource.java
+++ b/server/src/main/java/io/druid/server/QueryResource.java
@@ -29,8 +29,7 @@
 import com.google.inject.Inject;
 import com.metamx.common.guava.Sequence;
 import com.metamx.common.guava.Sequences;
-import com.metamx.common.logger.Logger;
-import com.metamx.emitter.service.AlertEvent;
+import com.metamx.emitter.EmittingLogger;
 import com.metamx.emitter.service.ServiceEmitter;
 import com.metamx.emitter.service.ServiceMetricEvent;
 import io.druid.guice.annotations.Json;
@@ -57,7 +56,7 @@
 @Path("/druid/v2/")
 public class QueryResource
 {
-  private static final Logger log = new Logger(QueryResource.class);
+  private static final EmittingLogger log = new EmittingLogger(QueryResource.class);
   private static final Charset UTF8 = Charset.forName("UTF-8");
   private static final Joiner COMMA_JOIN = Joiner.on(",");
 
@@ -192,16 +191,11 @@
         log.error(e2, "Unable to log query [%s]!", queryString);
       }
 
-      emitter.emit(
-          new AlertEvent.Builder().build(
-              "Exception handling request",
-              ImmutableMap.<String, Object>builder()
-                          .put("exception", e.toString())
-                          .put("query", queryString)
-                          .put("peer", req.getRemoteAddr())
-                          .build()
-          )
-      );
+      log.makeAlert(e, "Exception handling request")
+         .addData("exception", e.toString())
+         .addData("query", queryString)
+         .addData("peer", req.getRemoteAddr())
+         .emit();
     }
     finally {
       resp.flushBuffer();
diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
index 964e87b..582d0c6 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
@@ -61,6 +61,7 @@
 import io.druid.query.aggregation.LongSumAggregatorFactory;
 import io.druid.query.aggregation.PostAggregator;
 import io.druid.query.aggregation.post.ArithmeticPostAggregator;
+import io.druid.query.aggregation.post.ConstantPostAggregator;
 import io.druid.query.aggregation.post.FieldAccessPostAggregator;
 import io.druid.query.filter.DimFilter;
 import io.druid.query.search.SearchQueryQueryToolChest;
@@ -145,6 +146,22 @@
               new FieldAccessPostAggregator("imps", "imps"),
               new FieldAccessPostAggregator("rows", "rows")
           )
+      ),
+      new ArithmeticPostAggregator(
+          "avg_imps_per_row_double",
+          "*",
+          Arrays.<PostAggregator>asList(
+              new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
+              new ConstantPostAggregator("constant", 2, 2 )
+          )
+      ),
+      new ArithmeticPostAggregator(
+          "avg_imps_per_row_half",
+          "/",
+          Arrays.<PostAggregator>asList(
+              new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
+              new ConstantPostAggregator("constant", 2, 2 )
+          )
       )
   );
   private static final List<AggregatorFactory> RENAMED_AGGS = Arrays.asList(
@@ -412,7 +429,7 @@
             new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
             new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
             new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
-            new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
+            new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983
         ),
 
         new Interval("2011-01-05/2011-01-10"),
@@ -421,7 +438,7 @@
             new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
             new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
             new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
-            new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
+            new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983
         )
     );
 
@@ -437,8 +454,8 @@
             new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
             new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
             new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
-            new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
-            new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
+            new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983,
+            new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983
         ),
         runner.run(
             builder.intervals("2011-01-01/2011-01-10")
@@ -568,6 +585,73 @@
   }
 
   @Test
+  public  void testTopNOnPostAggMetricCaching() {
+    final TopNQueryBuilder builder = new TopNQueryBuilder()
+        .dataSource(DATA_SOURCE)
+        .dimension(TOP_DIM)
+        .metric("avg_imps_per_row_double")
+        .threshold(3)
+        .intervals(SEG_SPEC)
+        .filters(DIM_FILTER)
+        .granularity(GRANULARITY)
+        .aggregators(AGGS)
+        .postAggregators(POST_AGGS)
+        .context(CONTEXT);
+
+    QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
+    testQueryCaching(
+        runner,
+        builder.build(),
+        new Interval("2011-01-01/2011-01-02"),
+        makeTopNResults(),
+
+        new Interval("2011-01-02/2011-01-03"),
+        makeTopNResults(),
+
+        new Interval("2011-01-05/2011-01-10"),
+        makeTopNResults(
+            new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
+            new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
+            new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
+            new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
+            new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983
+        ),
+
+        new Interval("2011-01-05/2011-01-10"),
+        makeTopNResults(
+            new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
+            new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
+            new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
+            new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
+            new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983
+        )
+    );
+
+
+    TestHelper.assertExpectedResults(
+        makeTopNResults(
+            new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
+            new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
+            new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
+            new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
+            new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
+            new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
+            new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
+            new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
+            new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983,
+            new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983
+        ),
+        runner.run(
+            builder.intervals("2011-01-01/2011-01-10")
+                   .metric("avg_imps_per_row_double")
+                   .aggregators(AGGS)
+                   .postAggregators(POST_AGGS)
+                   .build()
+        )
+    );
+  }
+
+  @Test
   public void testSearchCaching() throws Exception
   {
     testQueryCaching(
@@ -1007,20 +1091,22 @@
 
     List<Result<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 3);
     for (int i = 0; i < objects.length; i += 3) {
+      double avg_impr = ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue();
       retVal.add(
           new Result<>(
               (DateTime) objects[i],
               new TimeseriesResultValue(
-                  ImmutableMap.of(
-                      "rows", objects[i + 1],
-                      "imps", objects[i + 2],
-                      "impers", objects[i + 2],
-                      "avg_imps_per_row",
-                      ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue()
+                  ImmutableMap.<String, Object>builder()
+                      .put("rows", objects[i + 1])
+                      .put("imps", objects[i + 2])
+                      .put("impers", objects[i + 2])
+                      .put("avg_imps_per_row",avg_impr)
+                      .put("avg_imps_per_row_half",avg_impr / 2)
+                      .put("avg_imps_per_row_double",avg_impr * 2)
+                      .build()
                   )
               )
-          )
-      );
+          );
     }
     return retVal;
   }
@@ -1099,13 +1185,15 @@
         final double imps = ((Number) objects[index + 2]).doubleValue();
         final double rows = ((Number) objects[index + 1]).doubleValue();
         values.add(
-            ImmutableMap.of(
-                TOP_DIM, objects[index],
-                "rows", rows,
-                "imps", imps,
-                "impers", imps,
-                "avg_imps_per_row", imps / rows
-            )
+            ImmutableMap.<String, Object>builder()
+                .put(TOP_DIM, objects[index])
+                .put("rows", rows)
+                .put("imps", imps)
+                .put("impers", imps)
+                .put("avg_imps_per_row", imps / rows)
+                .put("avg_imps_per_row_double", ((imps * 2) / rows))
+                .put("avg_imps_per_row_half", (imps / (rows * 2)))
+                .build()
         );
         index += 3;
       }
diff --git a/services/src/main/java/io/druid/cli/PullDependencies.java b/services/src/main/java/io/druid/cli/PullDependencies.java
index 1131a42..c6d7a84 100644
--- a/services/src/main/java/io/druid/cli/PullDependencies.java
+++ b/services/src/main/java/io/druid/cli/PullDependencies.java
@@ -19,7 +19,17 @@
 
 package io.druid.cli;
 
+import com.google.api.client.util.Lists;
+import com.google.common.base.Throwables;
+import com.google.inject.Inject;
 import io.airlift.command.Command;
+import io.airlift.command.Option;
+import io.druid.indexing.common.task.HadoopIndexTask;
+import io.druid.initialization.Initialization;
+import io.druid.server.initialization.ExtensionsConfig;
+import io.tesla.aether.internal.DefaultTeslaAether;
+
+import java.util.List;
 
 
 @Command(
@@ -28,8 +38,40 @@
 )
 public class PullDependencies implements Runnable
 {
+  @Option(name = "-c",
+          title = "coordinate",
+          description = "extra dependencies to pull down (e.g. hadoop coordinates)",
+          required = false)
+  public List<String> coordinates;
+
+  @Option(name = "--no-default-hadoop",
+          description = "don't pull down the default HadoopIndexTask dependencies",
+          required = false)
+  public boolean noDefaultHadoop;
+
+  @Inject
+  public ExtensionsConfig extensionsConfig = null;
+
   @Override
-  public void run() {
-    // dependencies are pulled down as a side-effect of Guice injection
+  public void run()
+  {
+    // Druid dependencies are pulled down as a side-effect of Guice injection. Extra dependencies are pulled down as
+    // a side-effect of getting class loaders.
+    final List<String> allCoordinates = Lists.newArrayList();
+    if (coordinates != null) {
+      allCoordinates.addAll(coordinates);
+    }
+    if (!noDefaultHadoop) {
+      allCoordinates.add(HadoopIndexTask.DEFAULT_HADOOP_COORDINATES);
+    }
+    try {
+      final DefaultTeslaAether aetherClient = Initialization.getAetherClient(extensionsConfig);
+      for (final String coordinate : allCoordinates) {
+        Initialization.getClassLoaderForCoordinates(aetherClient, coordinate);
+      }
+    }
+    catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
   }
 }