Merge pull request #456 from metamx/processing-threads-default

more sensible defaults
diff --git a/pom.xml b/pom.xml
index 02575cd..d1b94e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,8 @@
   ~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <groupId>io.druid</groupId>
     <artifactId>druid</artifactId>
@@ -313,17 +314,17 @@
             <dependency>
                 <groupId>org.eclipse.jetty</groupId>
                 <artifactId>jetty-server</artifactId>
-                <version>8.1.11.v20130520</version>
+                <version>9.1.3.v20140225</version>
             </dependency>
             <dependency>
                 <groupId>org.eclipse.jetty</groupId>
                 <artifactId>jetty-servlet</artifactId>
-                <version>8.1.11.v20130520</version>
+                <version>9.1.3.v20140225</version>
             </dependency>
             <dependency>
                 <groupId>org.eclipse.jetty</groupId>
                 <artifactId>jetty-servlets</artifactId>
-                <version>8.1.11.v20130520</version>
+                <version>9.1.3.v20140225</version>
             </dependency>
             <dependency>
                 <groupId>joda-time</groupId>
diff --git a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java
index a999794..22d609d 100644
--- a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java
+++ b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java
@@ -20,6 +20,8 @@
 package io.druid.client;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.metamx.common.guava.Accumulator;
 import com.metamx.common.guava.Sequence;
@@ -33,6 +35,7 @@
 import io.druid.query.QueryToolChest;
 import io.druid.query.SegmentDescriptor;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 
 public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
@@ -72,26 +75,33 @@
 
     final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
                                   && strategy != null
-                                  && cacheConfig.isPopulateCache()
-                                  // historical only populates distributed cache since the cache lookups are done at broker.
-                                  && !(cache instanceof MapCache);
+                                  && cacheConfig.isPopulateCache();
+
+    final Sequence<T> results = base.run(query);
+
     if (populateCache) {
-      Sequence<T> results = base.run(query);
-      Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
+      final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
           segmentIdentifier,
           segmentDescriptor,
           strategy.computeCacheKey(query)
       );
-      ArrayList<T> resultAsList = Sequences.toList(results, new ArrayList<T>());
-      CacheUtil.populate(
-          cache,
-          mapper,
-          key,
-          Lists.transform(resultAsList, strategy.prepareForCache())
+
+      final Function cacheFn = strategy.prepareForCache();
+      return Sequences.map(
+          results,
+          new Function<T, T>()
+          {
+            @Nullable
+            @Override
+            public T apply(@Nullable T input)
+            {
+              CacheUtil.populate(cache, mapper, key, ImmutableList.of(cacheFn.apply(input)));
+              return input;
+            }
+          }
       );
-      return Sequences.simple(resultAsList);
     } else {
-      return base.run(query);
+      return results;
     }
   }
 
diff --git a/server/src/main/java/io/druid/client/RoutingDruidClient.java b/server/src/main/java/io/druid/client/RoutingDruidClient.java
new file mode 100644
index 0000000..9fd3e2b
--- /dev/null
+++ b/server/src/main/java/io/druid/client/RoutingDruidClient.java
@@ -0,0 +1,113 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package io.druid.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.metamx.common.logger.Logger;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.response.HttpResponseHandler;
+import io.druid.guice.annotations.Global;
+import io.druid.query.Query;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URL;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ */
+public class RoutingDruidClient<IntermediateType, FinalType>
+{
+  private static final Logger log = new Logger(RoutingDruidClient.class);
+
+  private final ObjectMapper objectMapper;
+  private final HttpClient httpClient;
+
+  private final AtomicInteger openConnections;
+  private final boolean isSmile;
+
+  @Inject
+  public RoutingDruidClient(
+      ObjectMapper objectMapper,
+      @Global HttpClient httpClient
+  )
+  {
+    this.objectMapper = objectMapper;
+    this.httpClient = httpClient;
+
+    this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
+    this.openConnections = new AtomicInteger();
+  }
+
+  public int getNumOpenConnections()
+  {
+    return openConnections.get();
+  }
+
+  public ListenableFuture<FinalType> run(
+      String host,
+      Query query,
+      HttpResponseHandler<IntermediateType, FinalType> responseHandler
+  )
+  {
+    final ListenableFuture<FinalType> future;
+    final String url = String.format("http://%s/druid/v2/", host);
+
+    try {
+      log.debug("Querying url[%s]", url);
+      future = httpClient
+          .post(new URL(url))
+          .setContent(objectMapper.writeValueAsBytes(query))
+          .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
+          .go(responseHandler);
+
+      openConnections.getAndIncrement();
+
+      Futures.addCallback(
+          future,
+          new FutureCallback<FinalType>()
+          {
+            @Override
+            public void onSuccess(FinalType result)
+            {
+              openConnections.getAndDecrement();
+            }
+
+            @Override
+            public void onFailure(Throwable t)
+            {
+              openConnections.getAndDecrement();
+            }
+          }
+      );
+    }
+    catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+
+    return future;
+  }
+}
diff --git a/server/src/main/java/io/druid/client/selector/HostSelector.java b/server/src/main/java/io/druid/client/selector/HostSelector.java
new file mode 100644
index 0000000..dd8f366a
--- /dev/null
+++ b/server/src/main/java/io/druid/client/selector/HostSelector.java
@@ -0,0 +1,33 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package io.druid.client.selector;
+
+import com.metamx.common.Pair;
+import io.druid.curator.discovery.ServerDiscoverySelector;
+import io.druid.query.Query;
+
+/**
+ */
+public interface HostSelector<T>
+{
+  public String getDefaultServiceName();
+
+  public Pair<String, ServerDiscoverySelector> select(Query<T> query);
+}
diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java
new file mode 100644
index 0000000..85f33a7
--- /dev/null
+++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java
@@ -0,0 +1,238 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package io.druid.server;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.repackaged.com.google.common.base.Throwables;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.service.ServiceEmitter;
+import com.metamx.emitter.service.ServiceMetricEvent;
+import com.metamx.http.client.response.ClientResponse;
+import com.metamx.http.client.response.HttpResponseHandler;
+import io.druid.client.RoutingDruidClient;
+import io.druid.guice.annotations.Json;
+import io.druid.guice.annotations.Smile;
+import io.druid.query.Query;
+import io.druid.server.log.RequestLogger;
+import io.druid.server.router.QueryHostFinder;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.joda.time.DateTime;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+/**
+ */
+@WebServlet(asyncSupported = true)
+public class AsyncQueryForwardingServlet extends HttpServlet
+{
+  private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
+  private static final Charset UTF8 = Charset.forName("UTF-8");
+  private static final String DISPATCHED = "dispatched";
+
+  private final ObjectMapper jsonMapper;
+  private final ObjectMapper smileMapper;
+  private final QueryHostFinder hostFinder;
+  private final RoutingDruidClient routingDruidClient;
+  private final ServiceEmitter emitter;
+  private final RequestLogger requestLogger;
+  private final QueryIDProvider idProvider;
+
+  public AsyncQueryForwardingServlet(
+      @Json ObjectMapper jsonMapper,
+      @Smile ObjectMapper smileMapper,
+      QueryHostFinder hostFinder,
+      RoutingDruidClient routingDruidClient,
+      ServiceEmitter emitter,
+      RequestLogger requestLogger,
+      QueryIDProvider idProvider
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.smileMapper = smileMapper;
+    this.hostFinder = hostFinder;
+    this.routingDruidClient = routingDruidClient;
+    this.emitter = emitter;
+    this.requestLogger = requestLogger;
+    this.idProvider = idProvider;
+  }
+
+  @Override
+  protected void doPost(
+      final HttpServletRequest req, final HttpServletResponse resp
+  ) throws ServletException, IOException
+  {
+    final long start = System.currentTimeMillis();
+    Query query = null;
+    String queryId;
+
+    final boolean isSmile = "application/smile".equals(req.getContentType());
+
+    ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
+
+    OutputStream out = null;
+
+    try {
+      final AsyncContext ctx = req.startAsync(req, resp);
+
+      if (req.getAttribute(DISPATCHED) != null) {
+        return;
+      }
+
+      req.setAttribute(DISPATCHED, true);
+      resp.setStatus(200);
+      resp.setContentType("application/x-javascript");
+
+      query = objectMapper.readValue(req.getInputStream(), Query.class);
+      queryId = query.getId();
+      if (queryId == null) {
+        queryId = idProvider.next(query);
+        query = query.withId(queryId);
+      }
+
+      requestLogger.log(
+          new RequestLogLine(new DateTime(), req.getRemoteAddr(), query)
+      );
+      out = resp.getOutputStream();
+      final OutputStream outputStream = out;
+
+      final String host = hostFinder.getHost(query);
+
+      final Query theQuery = query;
+      final String theQueryId = queryId;
+
+      final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
+      {
+        @Override
+        public ClientResponse<OutputStream> handleResponse(HttpResponse response)
+        {
+          byte[] bytes = getContentBytes(response.getContent());
+          if (bytes.length > 0) {
+            try {
+              outputStream.write(bytes);
+            }
+            catch (Exception e) {
+              throw Throwables.propagate(e);
+            }
+          }
+          return ClientResponse.finished(outputStream);
+        }
+
+        @Override
+        public ClientResponse<OutputStream> handleChunk(
+            ClientResponse<OutputStream> clientResponse, HttpChunk chunk
+        )
+        {
+          byte[] bytes = getContentBytes(chunk.getContent());
+          if (bytes.length > 0) {
+            try {
+              clientResponse.getObj().write(bytes);
+            }
+            catch (Exception e) {
+              throw Throwables.propagate(e);
+            }
+          }
+          return clientResponse;
+        }
+
+        @Override
+        public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
+        {
+          final long requestTime = System.currentTimeMillis() - start;
+
+          log.info("Request time: %d", requestTime);
+
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setUser2(theQuery.getDataSource().getName())
+                  .setUser4(theQuery.getType())
+                  .setUser5(theQuery.getIntervals().get(0).toString())
+                  .setUser6(String.valueOf(theQuery.hasFilters()))
+                  .setUser7(req.getRemoteAddr())
+                  .setUser8(theQueryId)
+                  .setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString())
+                  .build("request/time", requestTime)
+          );
+
+          final OutputStream obj = clientResponse.getObj();
+          try {
+            resp.flushBuffer();
+            outputStream.close();
+          }
+          catch (Exception e) {
+            throw Throwables.propagate(e);
+          }
+          finally {
+            ctx.dispatch();
+          }
+
+          return ClientResponse.finished(obj);
+        }
+
+        private byte[] getContentBytes(ChannelBuffer content)
+        {
+          byte[] contentBytes = new byte[content.readableBytes()];
+          content.readBytes(contentBytes);
+          return contentBytes;
+        }
+      };
+
+      ctx.start(
+          new Runnable()
+          {
+            @Override
+            public void run()
+            {
+              routingDruidClient.run(host, theQuery, responseHandler);
+            }
+          }
+      );
+    }
+    catch (Exception e) {
+      if (!resp.isCommitted()) {
+        resp.setStatus(500);
+        resp.resetBuffer();
+
+        if (out == null) {
+          out = resp.getOutputStream();
+        }
+
+        out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
+        out.write("\n".getBytes(UTF8));
+      }
+
+      resp.flushBuffer();
+
+      log.makeAlert(e, "Exception handling request")
+         .addData("query", query)
+         .addData("peer", req.getRemoteAddr())
+         .emit();
+    }
+  }
+}
diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java
index 051a56e..a69a385 100644
--- a/server/src/main/java/io/druid/server/QueryResource.java
+++ b/server/src/main/java/io/druid/server/QueryResource.java
@@ -136,8 +136,8 @@
               .setUser5(query.getIntervals().get(0).toString())
               .setUser6(String.valueOf(query.hasFilters()))
               .setUser7(req.getRemoteAddr())
+              .setUser8(queryId)
               .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
-              .setUser10(queryId)
               .build("request/time", requestTime)
       );
     }
diff --git a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java
index 246645d..0b93dce 100644
--- a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java
+++ b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java
@@ -49,7 +49,7 @@
 import io.druid.server.StatusResource;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 
 import javax.servlet.ServletException;
@@ -154,13 +154,11 @@
     threadPool.setMinThreads(config.getNumThreads());
     threadPool.setMaxThreads(config.getNumThreads());
 
-    final Server server = new Server();
-    server.setThreadPool(threadPool);
+    final Server server = new Server(threadPool);
 
-    SelectChannelConnector connector = new SelectChannelConnector();
+    ServerConnector connector = new ServerConnector(server);
     connector.setPort(node.getPort());
-    connector.setMaxIdleTime(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
-    connector.setStatsOn(true);
+    connector.setIdleTimeout(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
 
     server.setConnectors(new Connector[]{connector});
 
diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java
index 9acf5f9..adf8e6f 100644
--- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java
+++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java
@@ -57,7 +57,7 @@
 
   private final HttpClient httpClient;
   private final ObjectMapper jsonMapper;
-  private final Supplier<TierConfig> config;
+  private final Supplier<TieredBrokerConfig> config;
   private final ServerDiscoverySelector selector;
 
   private final StatusResponseHandler responseHandler;
@@ -73,7 +73,7 @@
   public CoordinatorRuleManager(
       @Global HttpClient httpClient,
       @Json ObjectMapper jsonMapper,
-      Supplier<TierConfig> config,
+      Supplier<TieredBrokerConfig> config,
       ServerDiscoverySelector selector
   )
   {
diff --git a/server/src/main/java/io/druid/server/router/QueryHostFinder.java b/server/src/main/java/io/druid/server/router/QueryHostFinder.java
new file mode 100644
index 0000000..dfd94ca
--- /dev/null
+++ b/server/src/main/java/io/druid/server/router/QueryHostFinder.java
@@ -0,0 +1,98 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package io.druid.server.router;
+
+import com.google.inject.Inject;
+import com.metamx.common.Pair;
+import com.metamx.emitter.EmittingLogger;
+import io.druid.client.selector.Server;
+import io.druid.curator.discovery.ServerDiscoverySelector;
+import io.druid.query.Query;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ */
+public class QueryHostFinder<T>
+{
+  private static EmittingLogger log = new EmittingLogger(QueryHostFinder.class);
+
+  private final TieredBrokerHostSelector hostSelector;
+
+  private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
+
+  @Inject
+  public QueryHostFinder(
+      TieredBrokerHostSelector hostSelector
+  )
+  {
+    this.hostSelector = hostSelector;
+  }
+
+  public Server findServer(Query<T> query)
+  {
+    final Pair<String, ServerDiscoverySelector> selected = hostSelector.select(query);
+
+    final String serviceName = selected.lhs;
+    final ServerDiscoverySelector selector = selected.rhs;
+
+    Server server = selector.pick();
+    if (server == null) {
+      log.error(
+          "WTF?! No server found for serviceName[%s]. Using backup",
+          serviceName
+      );
+
+      server = serverBackup.get(serviceName);
+
+      if (server == null) {
+        log.error(
+            "WTF?! No backup found for serviceName[%s]. Using default[%s]",
+            serviceName,
+            hostSelector.getDefaultServiceName()
+        );
+
+        server = serverBackup.get(hostSelector.getDefaultServiceName());
+      }
+    }
+    if (server != null) {
+      serverBackup.put(serviceName, server);
+    }
+
+    return server;
+  }
+
+  public String getHost(Query<T> query)
+  {
+    Server server = findServer(query);
+
+    if (server == null) {
+      log.makeAlert(
+          "Catastrophic failure! No servers found at all! Failing request!"
+      ).emit();
+
+      return null;
+    }
+
+    log.info("Selected [%s]", server.getHost());
+
+    return server.getHost();
+  }
+}
diff --git a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java b/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java
deleted file mode 100644
index 92bf431..0000000
--- a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013  Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package io.druid.server.router;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import com.metamx.http.client.HttpClient;
-import io.druid.curator.discovery.ServerDiscoveryFactory;
-import io.druid.guice.annotations.Global;
-import io.druid.query.Query;
-import io.druid.query.QueryRunner;
-import io.druid.query.QuerySegmentWalker;
-import io.druid.query.QueryToolChestWarehouse;
-import io.druid.query.SegmentDescriptor;
-import org.joda.time.Interval;
-
-/**
- */
-public class RouterQuerySegmentWalker implements QuerySegmentWalker
-{
-  private final QueryToolChestWarehouse warehouse;
-  private final ObjectMapper objectMapper;
-  private final HttpClient httpClient;
-  private final BrokerSelector brokerSelector;
-  private final TierConfig tierConfig;
-
-  @Inject
-  public RouterQuerySegmentWalker(
-      QueryToolChestWarehouse warehouse,
-      ObjectMapper objectMapper,
-      @Global HttpClient httpClient,
-      BrokerSelector brokerSelector,
-      TierConfig tierConfig
-  )
-  {
-    this.warehouse = warehouse;
-    this.objectMapper = objectMapper;
-    this.httpClient = httpClient;
-    this.brokerSelector = brokerSelector;
-    this.tierConfig = tierConfig;
-  }
-
-  @Override
-  public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
-  {
-    return makeRunner();
-  }
-
-  @Override
-  public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
-  {
-    return makeRunner();
-  }
-
-  private <T> QueryRunner<T> makeRunner()
-  {
-    return new TierAwareQueryRunner<T>(
-        warehouse,
-        objectMapper,
-        httpClient,
-        brokerSelector,
-        tierConfig
-    );
-  }
-}
diff --git a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java b/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java
deleted file mode 100644
index 3a098e0..0000000
--- a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013  Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package io.druid.server.router;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Throwables;
-import com.metamx.common.Pair;
-import com.metamx.common.guava.Sequence;
-import com.metamx.common.guava.Sequences;
-import com.metamx.emitter.EmittingLogger;
-import com.metamx.http.client.HttpClient;
-import io.druid.client.DirectDruidClient;
-import io.druid.client.selector.Server;
-import io.druid.curator.discovery.ServerDiscoveryFactory;
-import io.druid.curator.discovery.ServerDiscoverySelector;
-import io.druid.query.Query;
-import io.druid.query.QueryRunner;
-import io.druid.query.QueryToolChestWarehouse;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- */
-public class TierAwareQueryRunner<T> implements QueryRunner<T>
-{
-  private static EmittingLogger log = new EmittingLogger(TierAwareQueryRunner.class);
-
-  private final QueryToolChestWarehouse warehouse;
-  private final ObjectMapper objectMapper;
-  private final HttpClient httpClient;
-  private final BrokerSelector<T> brokerSelector;
-  private final TierConfig tierConfig;
-
-  private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
-
-  public TierAwareQueryRunner(
-      QueryToolChestWarehouse warehouse,
-      ObjectMapper objectMapper,
-      HttpClient httpClient,
-      BrokerSelector<T> brokerSelector,
-      TierConfig tierConfig
-  )
-  {
-    this.warehouse = warehouse;
-    this.objectMapper = objectMapper;
-    this.httpClient = httpClient;
-    this.brokerSelector = brokerSelector;
-    this.tierConfig = tierConfig;
-  }
-
-  public Server findServer(Query<T> query)
-  {
-    final Pair<String, ServerDiscoverySelector> selected = brokerSelector.select(query);
-    final String brokerServiceName = selected.lhs;
-    final ServerDiscoverySelector selector = selected.rhs;
-
-    Server server = selector.pick();
-    if (server == null) {
-      log.error(
-          "WTF?! No server found for brokerServiceName[%s]. Using backup",
-          brokerServiceName
-      );
-
-      server = serverBackup.get(brokerServiceName);
-
-      if (server == null) {
-        log.makeAlert(
-            "WTF?! No backup found for brokerServiceName[%s]. Using default[%s]",
-            brokerServiceName,
-            tierConfig.getDefaultBrokerServiceName()
-        ).emit();
-
-        server = serverBackup.get(tierConfig.getDefaultBrokerServiceName());
-      }
-    } else {
-      serverBackup.put(brokerServiceName, server);
-    }
-
-    return server;
-  }
-
-  @Override
-  public Sequence<T> run(Query<T> query)
-  {
-    Server server = findServer(query);
-
-    if (server == null) {
-      log.makeAlert(
-          "Catastrophic failure! No servers found at all! Failing request!"
-      ).emit();
-      return Sequences.empty();
-    }
-
-    QueryRunner<T> client = new DirectDruidClient<T>(
-        warehouse,
-        objectMapper,
-        httpClient,
-        server.getHost()
-    );
-
-    return client.run(query);
-  }
-}
diff --git a/server/src/main/java/io/druid/server/router/TierConfig.java b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java
similarity index 98%
rename from server/src/main/java/io/druid/server/router/TierConfig.java
rename to server/src/main/java/io/druid/server/router/TieredBrokerConfig.java
index c819edf..67ff109 100644
--- a/server/src/main/java/io/druid/server/router/TierConfig.java
+++ b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java
@@ -29,7 +29,7 @@
 
 /**
  */
-public class TierConfig
+public class TieredBrokerConfig
 {
   @JsonProperty
   @NotNull
diff --git a/server/src/main/java/io/druid/server/router/BrokerSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java
similarity index 67%
rename from server/src/main/java/io/druid/server/router/BrokerSelector.java
rename to server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java
index e27acf0..cc625cd 100644
--- a/server/src/main/java/io/druid/server/router/BrokerSelector.java
+++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java
@@ -20,20 +20,20 @@
 package io.druid.server.router;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 import com.metamx.common.Pair;
-import com.metamx.common.concurrent.ScheduledExecutors;
 import com.metamx.common.lifecycle.LifecycleStart;
 import com.metamx.common.lifecycle.LifecycleStop;
 import com.metamx.emitter.EmittingLogger;
-import io.druid.concurrent.Execs;
+import io.druid.client.selector.HostSelector;
 import io.druid.curator.discovery.ServerDiscoveryFactory;
 import io.druid.curator.discovery.ServerDiscoverySelector;
 import io.druid.query.Query;
+import io.druid.query.timeboundary.TimeBoundaryQuery;
 import io.druid.server.coordinator.rules.LoadRule;
 import io.druid.server.coordinator.rules.Rule;
 import org.joda.time.DateTime;
-import org.joda.time.Duration;
 import org.joda.time.Interval;
 
 import java.util.List;
@@ -42,23 +42,23 @@
 
 /**
  */
-public class BrokerSelector<T>
+public class TieredBrokerHostSelector<T> implements HostSelector<T>
 {
-  private static EmittingLogger log = new EmittingLogger(BrokerSelector.class);
+  private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class);
 
   private final CoordinatorRuleManager ruleManager;
-  private final TierConfig tierConfig;
+  private final TieredBrokerConfig tierConfig;
   private final ServerDiscoveryFactory serverDiscoveryFactory;
-  private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<String, ServerDiscoverySelector>();
+  private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<>();
 
   private final Object lock = new Object();
 
   private volatile boolean started = false;
 
   @Inject
-  public BrokerSelector(
+  public TieredBrokerHostSelector(
       CoordinatorRuleManager ruleManager,
-      TierConfig tierConfig,
+      TieredBrokerConfig tierConfig,
       ServerDiscoveryFactory serverDiscoveryFactory
   )
   {
@@ -112,6 +112,12 @@
     }
   }
 
+  @Override
+  public String getDefaultServiceName()
+  {
+    return tierConfig.getDefaultBrokerServiceName();
+  }
+
   public Pair<String, ServerDiscoverySelector> select(final Query<T> query)
   {
     synchronized (lock) {
@@ -120,35 +126,46 @@
       }
     }
 
-    List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
+    String brokerServiceName = null;
 
-    // find the rule that can apply to the entire set of intervals
-    DateTime now = new DateTime();
-    int lastRulePosition = -1;
-    LoadRule baseRule = null;
+    // Somewhat janky way of always selecting highest priority broker for this type of query
+    if (query instanceof TimeBoundaryQuery) {
+      brokerServiceName = Iterables.getFirst(
+          tierConfig.getTierToBrokerMap().values(),
+          tierConfig.getDefaultBrokerServiceName()
+      );
+    }
 
-    for (Interval interval : query.getIntervals()) {
-      int currRulePosition = 0;
-      for (Rule rule : rules) {
-        if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
-          lastRulePosition = currRulePosition;
-          baseRule = (LoadRule) rule;
+    if (brokerServiceName == null) {
+      List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
+
+      // find the rule that can apply to the entire set of intervals
+      DateTime now = new DateTime();
+      int lastRulePosition = -1;
+      LoadRule baseRule = null;
+
+      for (Interval interval : query.getIntervals()) {
+        int currRulePosition = 0;
+        for (Rule rule : rules) {
+          if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
+            lastRulePosition = currRulePosition;
+            baseRule = (LoadRule) rule;
+            break;
+          }
+          currRulePosition++;
+        }
+      }
+
+      if (baseRule == null) {
+        return null;
+      }
+
+      // in the baseRule, find the broker of highest priority
+      for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
+        if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
+          brokerServiceName = entry.getValue();
           break;
         }
-        currRulePosition++;
-      }
-    }
-
-    if (baseRule == null) {
-      return null;
-    }
-
-    // in the baseRule, find the broker of highest priority
-    String brokerServiceName = null;
-    for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
-      if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
-        brokerServiceName = entry.getValue();
-        break;
       }
     }
 
diff --git a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java
index b87cb15..bb73c5d 100644
--- a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java
+++ b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java
@@ -123,7 +123,7 @@
 
     Sequence res = runner.run(builder.build());
     // base sequence is not closed yet
-    Assert.assertTrue(closable.isClosed());
+    Assert.assertFalse("sequence must not be closed", closable.isClosed());
     ArrayList results = Sequences.toList(res, new ArrayList());
     Assert.assertTrue(closable.isClosed());
     Assert.assertEquals(expectedRes, results);
diff --git a/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java
similarity index 90%
rename from server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java
rename to server/src/test/java/io/druid/server/router/QueryHostFinderTest.java
index d788daa..e8bf260 100644
--- a/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java
+++ b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java
@@ -40,20 +40,20 @@
 
 /**
  */
-public class TierAwareQueryRunnerTest
+public class QueryHostFinderTest
 {
   private ServerDiscoverySelector selector;
-  private BrokerSelector brokerSelector;
-  private TierConfig config;
+  private TieredBrokerHostSelector brokerSelector;
+  private TieredBrokerConfig config;
   private Server server;
 
   @Before
   public void setUp() throws Exception
   {
     selector = EasyMock.createMock(ServerDiscoverySelector.class);
-    brokerSelector = EasyMock.createMock(BrokerSelector.class);
+    brokerSelector = EasyMock.createMock(TieredBrokerHostSelector.class);
 
-    config = new TierConfig()
+    config = new TieredBrokerConfig()
     {
       @Override
       public LinkedHashMap<String, String> getTierToBrokerMap()
@@ -118,12 +118,8 @@
     EasyMock.expect(selector.pick()).andReturn(server).once();
     EasyMock.replay(selector);
 
-    TierAwareQueryRunner queryRunner = new TierAwareQueryRunner(
-        null,
-        null,
-        null,
-        brokerSelector,
-        config
+    QueryHostFinder queryRunner = new QueryHostFinder(
+        brokerSelector
     );
 
     Server server = queryRunner.findServer(
diff --git a/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java
similarity index 84%
rename from server/src/test/java/io/druid/server/router/BrokerSelectorTest.java
rename to server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java
index 55e5dda..b2a22af 100644
--- a/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java
+++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java
@@ -50,11 +50,11 @@
 
 /**
  */
-public class BrokerSelectorTest
+public class TieredBrokerHostSelectorTest
 {
   private ServerDiscoveryFactory factory;
   private ServerDiscoverySelector selector;
-  private BrokerSelector brokerSelector;
+  private TieredBrokerHostSelector brokerSelector;
 
   @Before
   public void setUp() throws Exception
@@ -62,9 +62,9 @@
     factory = EasyMock.createMock(ServerDiscoveryFactory.class);
     selector = EasyMock.createMock(ServerDiscoverySelector.class);
 
-    brokerSelector = new BrokerSelector(
+    brokerSelector = new TieredBrokerHostSelector(
         new TestRuleManager(null, null, null, null),
-        new TierConfig()
+        new TieredBrokerConfig()
         {
           @Override
           public LinkedHashMap<String, String> getTierToBrokerMap()
@@ -112,11 +112,12 @@
   public void testBasicSelect() throws Exception
   {
     String brokerName = (String) brokerSelector.select(
-        new TimeBoundaryQuery(
-            new TableDataSource("test"),
-            new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01"))),
-            null
-        )
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource("test")
+              .granularity("all")
+              .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
+              .intervals(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01")))
+              .build()
     ).lhs;
 
     Assert.assertEquals("coldBroker", brokerName);
@@ -127,11 +128,12 @@
   public void testBasicSelect2() throws Exception
   {
     String brokerName = (String) brokerSelector.select(
-        new TimeBoundaryQuery(
-            new TableDataSource("test"),
-            new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01"))),
-            null
-        )
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource("test")
+              .granularity("all")
+              .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
+              .intervals(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01")))
+              .build()
     ).lhs;
 
     Assert.assertEquals("hotBroker", brokerName);
@@ -141,11 +143,12 @@
   public void testSelectMatchesNothing() throws Exception
   {
     Pair retVal = brokerSelector.select(
-        new TimeBoundaryQuery(
-            new TableDataSource("test"),
-            new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01"))),
-            null
-        )
+        Druids.newTimeseriesQueryBuilder()
+              .dataSource("test")
+              .granularity("all")
+              .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
+              .intervals(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01")))
+              .build()
     );
 
     Assert.assertEquals(null, retVal);
@@ -199,7 +202,7 @@
     public TestRuleManager(
         @Global HttpClient httpClient,
         @Json ObjectMapper jsonMapper,
-        Supplier<TierConfig> config,
+        Supplier<TieredBrokerConfig> config,
         ServerDiscoverySelector selector
     )
     {
diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java
index 740edd8..1d0d100 100644
--- a/services/src/main/java/io/druid/cli/CliRouter.java
+++ b/services/src/main/java/io/druid/cli/CliRouter.java
@@ -25,24 +25,20 @@
 import com.google.inject.Provides;
 import com.metamx.common.logger.Logger;
 import io.airlift.command.Command;
+import io.druid.client.RoutingDruidClient;
 import io.druid.curator.discovery.DiscoveryModule;
 import io.druid.curator.discovery.ServerDiscoveryFactory;
 import io.druid.curator.discovery.ServerDiscoverySelector;
-import io.druid.guice.Jerseys;
 import io.druid.guice.JsonConfigProvider;
 import io.druid.guice.LazySingleton;
 import io.druid.guice.LifecycleModule;
 import io.druid.guice.ManageLifecycle;
 import io.druid.guice.annotations.Self;
-import io.druid.query.MapQueryToolChestWarehouse;
-import io.druid.query.QuerySegmentWalker;
-import io.druid.query.QueryToolChestWarehouse;
-import io.druid.server.QueryResource;
 import io.druid.server.initialization.JettyServerInitializer;
-import io.druid.server.router.BrokerSelector;
 import io.druid.server.router.CoordinatorRuleManager;
-import io.druid.server.router.RouterQuerySegmentWalker;
-import io.druid.server.router.TierConfig;
+import io.druid.server.router.QueryHostFinder;
+import io.druid.server.router.TieredBrokerConfig;
+import io.druid.server.router.TieredBrokerHostSelector;
 import org.eclipse.jetty.server.Server;
 
 import java.util.List;
@@ -71,19 +67,16 @@
           @Override
           public void configure(Binder binder)
           {
-            JsonConfigProvider.bind(binder, "druid.router", TierConfig.class);
+            JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class);
 
             binder.bind(CoordinatorRuleManager.class);
             LifecycleModule.register(binder, CoordinatorRuleManager.class);
 
-            binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
+            binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class);
+            binder.bind(QueryHostFinder.class).in(LazySingleton.class);
+            binder.bind(RoutingDruidClient.class).in(LazySingleton.class);
 
-            binder.bind(BrokerSelector.class).in(ManageLifecycle.class);
-            binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class);
-
-            binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
-            Jerseys.addResource(binder, QueryResource.class);
-            LifecycleModule.register(binder, QueryResource.class);
+            binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);
 
             LifecycleModule.register(binder, Server.class);
             DiscoveryModule.register(binder, Self.class);
@@ -92,7 +85,7 @@
           @Provides
           @ManageLifecycle
           public ServerDiscoverySelector getCoordinatorServerDiscoverySelector(
-              TierConfig config,
+              TieredBrokerConfig config,
               ServerDiscoveryFactory factory
 
           )
diff --git a/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java b/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java
index f3be30e..34039e2 100644
--- a/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java
+++ b/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java
@@ -38,16 +38,13 @@
   @Override
   public void initialize(Server server, Injector injector)
   {
-    final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
-    queries.setResourceBase("/");
-
     final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
     root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
     root.addFilter(GzipFilter.class, "/*", null);
     root.addFilter(GuiceFilter.class, "/*", null);
 
     final HandlerList handlerList = new HandlerList();
-    handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
+    handlerList.setHandlers(new Handler[]{root, new DefaultHandler()});
     server.setHandler(handlerList);
   }
 }
diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java
new file mode 100644
index 0000000..8ed1849
--- /dev/null
+++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java
@@ -0,0 +1,104 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013  Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package io.druid.cli;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.servlet.GuiceFilter;
+import com.metamx.emitter.service.ServiceEmitter;
+import io.druid.client.RoutingDruidClient;
+import io.druid.guice.annotations.Json;
+import io.druid.guice.annotations.Smile;
+import io.druid.server.AsyncQueryForwardingServlet;
+import io.druid.server.QueryIDProvider;
+import io.druid.server.initialization.JettyServerInitializer;
+import io.druid.server.log.RequestLogger;
+import io.druid.server.router.QueryHostFinder;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.DefaultHandler;
+import org.eclipse.jetty.server.handler.HandlerList;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.GzipFilter;
+
+/**
+ */
+public class RouterJettyServerInitializer implements JettyServerInitializer
+{
+  private final ObjectMapper jsonMapper;
+  private final ObjectMapper smileMapper;
+  private final QueryHostFinder hostFinder;
+  private final RoutingDruidClient routingDruidClient;
+  private final ServiceEmitter emitter;
+  private final RequestLogger requestLogger;
+  private final QueryIDProvider idProvider;
+
+  @Inject
+  public RouterJettyServerInitializer(
+      @Json ObjectMapper jsonMapper,
+      @Smile ObjectMapper smileMapper,
+      QueryHostFinder hostFinder,
+      RoutingDruidClient routingDruidClient,
+      ServiceEmitter emitter,
+      RequestLogger requestLogger,
+      QueryIDProvider idProvider
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.smileMapper = smileMapper;
+    this.hostFinder = hostFinder;
+    this.routingDruidClient = routingDruidClient;
+    this.emitter = emitter;
+    this.requestLogger = requestLogger;
+    this.idProvider = idProvider;
+  }
+
+  @Override
+  public void initialize(Server server, Injector injector)
+  {
+    final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
+    queries.addServlet(
+        new ServletHolder(
+            new AsyncQueryForwardingServlet(
+                jsonMapper,
+                smileMapper,
+                hostFinder,
+                routingDruidClient,
+                emitter,
+                requestLogger,
+                idProvider
+            )
+        ), "/druid/v2/*"
+    );
+    queries.addFilter(GzipFilter.class, "/druid/v2/*", null);
+
+    final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
+    root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
+    root.addFilter(GzipFilter.class, "/*", null);
+    root.addFilter(GuiceFilter.class, "/*", null);
+
+    final HandlerList handlerList = new HandlerList();
+    handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
+    server.setHandler(handlerList);
+  }
+}