Merge pull request #456 from metamx/processing-threads-default
more sensible defaults
diff --git a/pom.xml b/pom.xml
index 02575cd..d1b94e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
@@ -313,17 +314,17 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>8.1.11.v20130520</version>
+ <version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>8.1.11.v20130520</version>
+ <version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
- <version>8.1.11.v20130520</version>
+ <version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
diff --git a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java
index a999794..22d609d 100644
--- a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java
+++ b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java
@@ -20,6 +20,8 @@
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
@@ -33,6 +35,7 @@
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
+import javax.annotation.Nullable;
import java.util.ArrayList;
public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
@@ -72,26 +75,33 @@
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
&& strategy != null
- && cacheConfig.isPopulateCache()
- // historical only populates distributed cache since the cache lookups are done at broker.
- && !(cache instanceof MapCache);
+ && cacheConfig.isPopulateCache();
+
+ final Sequence<T> results = base.run(query);
+
if (populateCache) {
- Sequence<T> results = base.run(query);
- Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
+ final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
strategy.computeCacheKey(query)
);
- ArrayList<T> resultAsList = Sequences.toList(results, new ArrayList<T>());
- CacheUtil.populate(
- cache,
- mapper,
- key,
- Lists.transform(resultAsList, strategy.prepareForCache())
+
+ final Function cacheFn = strategy.prepareForCache();
+ return Sequences.map(
+ results,
+ new Function<T, T>()
+ {
+ @Nullable
+ @Override
+ public T apply(@Nullable T input)
+ {
+ CacheUtil.populate(cache, mapper, key, ImmutableList.of(cacheFn.apply(input)));
+ return input;
+ }
+ }
);
- return Sequences.simple(resultAsList);
} else {
- return base.run(query);
+ return results;
}
}
diff --git a/server/src/main/java/io/druid/client/RoutingDruidClient.java b/server/src/main/java/io/druid/client/RoutingDruidClient.java
new file mode 100644
index 0000000..9fd3e2b
--- /dev/null
+++ b/server/src/main/java/io/druid/client/RoutingDruidClient.java
@@ -0,0 +1,113 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.metamx.common.logger.Logger;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.response.HttpResponseHandler;
+import io.druid.guice.annotations.Global;
+import io.druid.query.Query;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URL;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ */
+public class RoutingDruidClient<IntermediateType, FinalType>
+{
+ private static final Logger log = new Logger(RoutingDruidClient.class);
+
+ private final ObjectMapper objectMapper;
+ private final HttpClient httpClient;
+
+ private final AtomicInteger openConnections;
+ private final boolean isSmile;
+
+ @Inject
+ public RoutingDruidClient(
+ ObjectMapper objectMapper,
+ @Global HttpClient httpClient
+ )
+ {
+ this.objectMapper = objectMapper;
+ this.httpClient = httpClient;
+
+ this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
+ this.openConnections = new AtomicInteger();
+ }
+
+ public int getNumOpenConnections()
+ {
+ return openConnections.get();
+ }
+
+ public ListenableFuture<FinalType> run(
+ String host,
+ Query query,
+ HttpResponseHandler<IntermediateType, FinalType> responseHandler
+ )
+ {
+ final ListenableFuture<FinalType> future;
+ final String url = String.format("http://%s/druid/v2/", host);
+
+ try {
+ log.debug("Querying url[%s]", url);
+ future = httpClient
+ .post(new URL(url))
+ .setContent(objectMapper.writeValueAsBytes(query))
+ .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
+ .go(responseHandler);
+
+ openConnections.getAndIncrement();
+
+ Futures.addCallback(
+ future,
+ new FutureCallback<FinalType>()
+ {
+ @Override
+ public void onSuccess(FinalType result)
+ {
+ openConnections.getAndDecrement();
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ openConnections.getAndDecrement();
+ }
+ }
+ );
+ }
+ catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+
+ return future;
+ }
+}
diff --git a/server/src/main/java/io/druid/client/selector/HostSelector.java b/server/src/main/java/io/druid/client/selector/HostSelector.java
new file mode 100644
index 0000000..dd8f366a
--- /dev/null
+++ b/server/src/main/java/io/druid/client/selector/HostSelector.java
@@ -0,0 +1,33 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.client.selector;
+
+import com.metamx.common.Pair;
+import io.druid.curator.discovery.ServerDiscoverySelector;
+import io.druid.query.Query;
+
+/**
+ */
+public interface HostSelector<T>
+{
+ public String getDefaultServiceName();
+
+ public Pair<String, ServerDiscoverySelector> select(Query<T> query);
+}
diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java
new file mode 100644
index 0000000..85f33a7
--- /dev/null
+++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java
@@ -0,0 +1,238 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.server;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.repackaged.com.google.common.base.Throwables;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.service.ServiceEmitter;
+import com.metamx.emitter.service.ServiceMetricEvent;
+import com.metamx.http.client.response.ClientResponse;
+import com.metamx.http.client.response.HttpResponseHandler;
+import io.druid.client.RoutingDruidClient;
+import io.druid.guice.annotations.Json;
+import io.druid.guice.annotations.Smile;
+import io.druid.query.Query;
+import io.druid.server.log.RequestLogger;
+import io.druid.server.router.QueryHostFinder;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.joda.time.DateTime;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+/**
+ */
+@WebServlet(asyncSupported = true)
+public class AsyncQueryForwardingServlet extends HttpServlet
+{
+ private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final String DISPATCHED = "dispatched";
+
+ private final ObjectMapper jsonMapper;
+ private final ObjectMapper smileMapper;
+ private final QueryHostFinder hostFinder;
+ private final RoutingDruidClient routingDruidClient;
+ private final ServiceEmitter emitter;
+ private final RequestLogger requestLogger;
+ private final QueryIDProvider idProvider;
+
+ public AsyncQueryForwardingServlet(
+ @Json ObjectMapper jsonMapper,
+ @Smile ObjectMapper smileMapper,
+ QueryHostFinder hostFinder,
+ RoutingDruidClient routingDruidClient,
+ ServiceEmitter emitter,
+ RequestLogger requestLogger,
+ QueryIDProvider idProvider
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.smileMapper = smileMapper;
+ this.hostFinder = hostFinder;
+ this.routingDruidClient = routingDruidClient;
+ this.emitter = emitter;
+ this.requestLogger = requestLogger;
+ this.idProvider = idProvider;
+ }
+
+ @Override
+ protected void doPost(
+ final HttpServletRequest req, final HttpServletResponse resp
+ ) throws ServletException, IOException
+ {
+ final long start = System.currentTimeMillis();
+ Query query = null;
+ String queryId;
+
+ final boolean isSmile = "application/smile".equals(req.getContentType());
+
+ ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
+
+ OutputStream out = null;
+
+ try {
+ final AsyncContext ctx = req.startAsync(req, resp);
+
+ if (req.getAttribute(DISPATCHED) != null) {
+ return;
+ }
+
+ req.setAttribute(DISPATCHED, true);
+ resp.setStatus(200);
+ resp.setContentType("application/x-javascript");
+
+ query = objectMapper.readValue(req.getInputStream(), Query.class);
+ queryId = query.getId();
+ if (queryId == null) {
+ queryId = idProvider.next(query);
+ query = query.withId(queryId);
+ }
+
+ requestLogger.log(
+ new RequestLogLine(new DateTime(), req.getRemoteAddr(), query)
+ );
+ out = resp.getOutputStream();
+ final OutputStream outputStream = out;
+
+ final String host = hostFinder.getHost(query);
+
+ final Query theQuery = query;
+ final String theQueryId = queryId;
+
+ final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
+ {
+ @Override
+ public ClientResponse<OutputStream> handleResponse(HttpResponse response)
+ {
+ byte[] bytes = getContentBytes(response.getContent());
+ if (bytes.length > 0) {
+ try {
+ outputStream.write(bytes);
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ return ClientResponse.finished(outputStream);
+ }
+
+ @Override
+ public ClientResponse<OutputStream> handleChunk(
+ ClientResponse<OutputStream> clientResponse, HttpChunk chunk
+ )
+ {
+ byte[] bytes = getContentBytes(chunk.getContent());
+ if (bytes.length > 0) {
+ try {
+ clientResponse.getObj().write(bytes);
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ return clientResponse;
+ }
+
+ @Override
+ public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
+ {
+ final long requestTime = System.currentTimeMillis() - start;
+
+ log.info("Request time: %d", requestTime);
+
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setUser2(theQuery.getDataSource().getName())
+ .setUser4(theQuery.getType())
+ .setUser5(theQuery.getIntervals().get(0).toString())
+ .setUser6(String.valueOf(theQuery.hasFilters()))
+ .setUser7(req.getRemoteAddr())
+ .setUser8(theQueryId)
+ .setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString())
+ .build("request/time", requestTime)
+ );
+
+ final OutputStream obj = clientResponse.getObj();
+ try {
+ resp.flushBuffer();
+ outputStream.close();
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ finally {
+ ctx.dispatch();
+ }
+
+ return ClientResponse.finished(obj);
+ }
+
+ private byte[] getContentBytes(ChannelBuffer content)
+ {
+ byte[] contentBytes = new byte[content.readableBytes()];
+ content.readBytes(contentBytes);
+ return contentBytes;
+ }
+ };
+
+ ctx.start(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ routingDruidClient.run(host, theQuery, responseHandler);
+ }
+ }
+ );
+ }
+ catch (Exception e) {
+ if (!resp.isCommitted()) {
+ resp.setStatus(500);
+ resp.resetBuffer();
+
+ if (out == null) {
+ out = resp.getOutputStream();
+ }
+
+ out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
+ out.write("\n".getBytes(UTF8));
+ }
+
+ resp.flushBuffer();
+
+ log.makeAlert(e, "Exception handling request")
+ .addData("query", query)
+ .addData("peer", req.getRemoteAddr())
+ .emit();
+ }
+ }
+}
diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java
index 051a56e..a69a385 100644
--- a/server/src/main/java/io/druid/server/QueryResource.java
+++ b/server/src/main/java/io/druid/server/QueryResource.java
@@ -136,8 +136,8 @@
.setUser5(query.getIntervals().get(0).toString())
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
+ .setUser8(queryId)
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
- .setUser10(queryId)
.build("request/time", requestTime)
);
}
diff --git a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java
index 246645d..0b93dce 100644
--- a/server/src/main/java/io/druid/server/initialization/JettyServerModule.java
+++ b/server/src/main/java/io/druid/server/initialization/JettyServerModule.java
@@ -49,7 +49,7 @@
import io.druid.server.StatusResource;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import javax.servlet.ServletException;
@@ -154,13 +154,11 @@
threadPool.setMinThreads(config.getNumThreads());
threadPool.setMaxThreads(config.getNumThreads());
- final Server server = new Server();
- server.setThreadPool(threadPool);
+ final Server server = new Server(threadPool);
- SelectChannelConnector connector = new SelectChannelConnector();
+ ServerConnector connector = new ServerConnector(server);
connector.setPort(node.getPort());
- connector.setMaxIdleTime(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
- connector.setStatsOn(true);
+ connector.setIdleTimeout(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
server.setConnectors(new Connector[]{connector});
diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java
index 9acf5f9..adf8e6f 100644
--- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java
+++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java
@@ -57,7 +57,7 @@
private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
- private final Supplier<TierConfig> config;
+ private final Supplier<TieredBrokerConfig> config;
private final ServerDiscoverySelector selector;
private final StatusResponseHandler responseHandler;
@@ -73,7 +73,7 @@
public CoordinatorRuleManager(
@Global HttpClient httpClient,
@Json ObjectMapper jsonMapper,
- Supplier<TierConfig> config,
+ Supplier<TieredBrokerConfig> config,
ServerDiscoverySelector selector
)
{
diff --git a/server/src/main/java/io/druid/server/router/QueryHostFinder.java b/server/src/main/java/io/druid/server/router/QueryHostFinder.java
new file mode 100644
index 0000000..dfd94ca
--- /dev/null
+++ b/server/src/main/java/io/druid/server/router/QueryHostFinder.java
@@ -0,0 +1,98 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.server.router;
+
+import com.google.inject.Inject;
+import com.metamx.common.Pair;
+import com.metamx.emitter.EmittingLogger;
+import io.druid.client.selector.Server;
+import io.druid.curator.discovery.ServerDiscoverySelector;
+import io.druid.query.Query;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ */
+public class QueryHostFinder<T>
+{
+ private static EmittingLogger log = new EmittingLogger(QueryHostFinder.class);
+
+ private final TieredBrokerHostSelector hostSelector;
+
+ private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
+
+ @Inject
+ public QueryHostFinder(
+ TieredBrokerHostSelector hostSelector
+ )
+ {
+ this.hostSelector = hostSelector;
+ }
+
+ public Server findServer(Query<T> query)
+ {
+ final Pair<String, ServerDiscoverySelector> selected = hostSelector.select(query);
+
+ final String serviceName = selected.lhs;
+ final ServerDiscoverySelector selector = selected.rhs;
+
+ Server server = selector.pick();
+ if (server == null) {
+ log.error(
+ "WTF?! No server found for serviceName[%s]. Using backup",
+ serviceName
+ );
+
+ server = serverBackup.get(serviceName);
+
+ if (server == null) {
+ log.error(
+ "WTF?! No backup found for serviceName[%s]. Using default[%s]",
+ serviceName,
+ hostSelector.getDefaultServiceName()
+ );
+
+ server = serverBackup.get(hostSelector.getDefaultServiceName());
+ }
+ }
+ if (server != null) {
+ serverBackup.put(serviceName, server);
+ }
+
+ return server;
+ }
+
+ public String getHost(Query<T> query)
+ {
+ Server server = findServer(query);
+
+ if (server == null) {
+ log.makeAlert(
+ "Catastrophic failure! No servers found at all! Failing request!"
+ ).emit();
+
+ return null;
+ }
+
+ log.info("Selected [%s]", server.getHost());
+
+ return server.getHost();
+ }
+}
diff --git a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java b/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java
deleted file mode 100644
index 92bf431..0000000
--- a/server/src/main/java/io/druid/server/router/RouterQuerySegmentWalker.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.server.router;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import com.metamx.http.client.HttpClient;
-import io.druid.curator.discovery.ServerDiscoveryFactory;
-import io.druid.guice.annotations.Global;
-import io.druid.query.Query;
-import io.druid.query.QueryRunner;
-import io.druid.query.QuerySegmentWalker;
-import io.druid.query.QueryToolChestWarehouse;
-import io.druid.query.SegmentDescriptor;
-import org.joda.time.Interval;
-
-/**
- */
-public class RouterQuerySegmentWalker implements QuerySegmentWalker
-{
- private final QueryToolChestWarehouse warehouse;
- private final ObjectMapper objectMapper;
- private final HttpClient httpClient;
- private final BrokerSelector brokerSelector;
- private final TierConfig tierConfig;
-
- @Inject
- public RouterQuerySegmentWalker(
- QueryToolChestWarehouse warehouse,
- ObjectMapper objectMapper,
- @Global HttpClient httpClient,
- BrokerSelector brokerSelector,
- TierConfig tierConfig
- )
- {
- this.warehouse = warehouse;
- this.objectMapper = objectMapper;
- this.httpClient = httpClient;
- this.brokerSelector = brokerSelector;
- this.tierConfig = tierConfig;
- }
-
- @Override
- public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
- {
- return makeRunner();
- }
-
- @Override
- public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
- {
- return makeRunner();
- }
-
- private <T> QueryRunner<T> makeRunner()
- {
- return new TierAwareQueryRunner<T>(
- warehouse,
- objectMapper,
- httpClient,
- brokerSelector,
- tierConfig
- );
- }
-}
diff --git a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java b/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java
deleted file mode 100644
index 3a098e0..0000000
--- a/server/src/main/java/io/druid/server/router/TierAwareQueryRunner.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.server.router;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Throwables;
-import com.metamx.common.Pair;
-import com.metamx.common.guava.Sequence;
-import com.metamx.common.guava.Sequences;
-import com.metamx.emitter.EmittingLogger;
-import com.metamx.http.client.HttpClient;
-import io.druid.client.DirectDruidClient;
-import io.druid.client.selector.Server;
-import io.druid.curator.discovery.ServerDiscoveryFactory;
-import io.druid.curator.discovery.ServerDiscoverySelector;
-import io.druid.query.Query;
-import io.druid.query.QueryRunner;
-import io.druid.query.QueryToolChestWarehouse;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- */
-public class TierAwareQueryRunner<T> implements QueryRunner<T>
-{
- private static EmittingLogger log = new EmittingLogger(TierAwareQueryRunner.class);
-
- private final QueryToolChestWarehouse warehouse;
- private final ObjectMapper objectMapper;
- private final HttpClient httpClient;
- private final BrokerSelector<T> brokerSelector;
- private final TierConfig tierConfig;
-
- private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
-
- public TierAwareQueryRunner(
- QueryToolChestWarehouse warehouse,
- ObjectMapper objectMapper,
- HttpClient httpClient,
- BrokerSelector<T> brokerSelector,
- TierConfig tierConfig
- )
- {
- this.warehouse = warehouse;
- this.objectMapper = objectMapper;
- this.httpClient = httpClient;
- this.brokerSelector = brokerSelector;
- this.tierConfig = tierConfig;
- }
-
- public Server findServer(Query<T> query)
- {
- final Pair<String, ServerDiscoverySelector> selected = brokerSelector.select(query);
- final String brokerServiceName = selected.lhs;
- final ServerDiscoverySelector selector = selected.rhs;
-
- Server server = selector.pick();
- if (server == null) {
- log.error(
- "WTF?! No server found for brokerServiceName[%s]. Using backup",
- brokerServiceName
- );
-
- server = serverBackup.get(brokerServiceName);
-
- if (server == null) {
- log.makeAlert(
- "WTF?! No backup found for brokerServiceName[%s]. Using default[%s]",
- brokerServiceName,
- tierConfig.getDefaultBrokerServiceName()
- ).emit();
-
- server = serverBackup.get(tierConfig.getDefaultBrokerServiceName());
- }
- } else {
- serverBackup.put(brokerServiceName, server);
- }
-
- return server;
- }
-
- @Override
- public Sequence<T> run(Query<T> query)
- {
- Server server = findServer(query);
-
- if (server == null) {
- log.makeAlert(
- "Catastrophic failure! No servers found at all! Failing request!"
- ).emit();
- return Sequences.empty();
- }
-
- QueryRunner<T> client = new DirectDruidClient<T>(
- warehouse,
- objectMapper,
- httpClient,
- server.getHost()
- );
-
- return client.run(query);
- }
-}
diff --git a/server/src/main/java/io/druid/server/router/TierConfig.java b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java
similarity index 98%
rename from server/src/main/java/io/druid/server/router/TierConfig.java
rename to server/src/main/java/io/druid/server/router/TieredBrokerConfig.java
index c819edf..67ff109 100644
--- a/server/src/main/java/io/druid/server/router/TierConfig.java
+++ b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java
@@ -29,7 +29,7 @@
/**
*/
-public class TierConfig
+public class TieredBrokerConfig
{
@JsonProperty
@NotNull
diff --git a/server/src/main/java/io/druid/server/router/BrokerSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java
similarity index 67%
rename from server/src/main/java/io/druid/server/router/BrokerSelector.java
rename to server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java
index e27acf0..cc625cd 100644
--- a/server/src/main/java/io/druid/server/router/BrokerSelector.java
+++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java
@@ -20,20 +20,20 @@
package io.druid.server.router;
import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
-import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
-import io.druid.concurrent.Execs;
+import io.druid.client.selector.HostSelector;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
+import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
-import org.joda.time.Duration;
import org.joda.time.Interval;
import java.util.List;
@@ -42,23 +42,23 @@
/**
*/
-public class BrokerSelector<T>
+public class TieredBrokerHostSelector<T> implements HostSelector<T>
{
- private static EmittingLogger log = new EmittingLogger(BrokerSelector.class);
+ private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class);
private final CoordinatorRuleManager ruleManager;
- private final TierConfig tierConfig;
+ private final TieredBrokerConfig tierConfig;
private final ServerDiscoveryFactory serverDiscoveryFactory;
- private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<String, ServerDiscoverySelector>();
+ private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<>();
private final Object lock = new Object();
private volatile boolean started = false;
@Inject
- public BrokerSelector(
+ public TieredBrokerHostSelector(
CoordinatorRuleManager ruleManager,
- TierConfig tierConfig,
+ TieredBrokerConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory
)
{
@@ -112,6 +112,12 @@
}
}
+ @Override
+ public String getDefaultServiceName()
+ {
+ return tierConfig.getDefaultBrokerServiceName();
+ }
+
public Pair<String, ServerDiscoverySelector> select(final Query<T> query)
{
synchronized (lock) {
@@ -120,35 +126,46 @@
}
}
- List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
+ String brokerServiceName = null;
- // find the rule that can apply to the entire set of intervals
- DateTime now = new DateTime();
- int lastRulePosition = -1;
- LoadRule baseRule = null;
+ // Somewhat janky way of always selecting highest priority broker for this type of query
+ if (query instanceof TimeBoundaryQuery) {
+ brokerServiceName = Iterables.getFirst(
+ tierConfig.getTierToBrokerMap().values(),
+ tierConfig.getDefaultBrokerServiceName()
+ );
+ }
- for (Interval interval : query.getIntervals()) {
- int currRulePosition = 0;
- for (Rule rule : rules) {
- if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
- lastRulePosition = currRulePosition;
- baseRule = (LoadRule) rule;
+ if (brokerServiceName == null) {
+ List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
+
+ // find the rule that can apply to the entire set of intervals
+ DateTime now = new DateTime();
+ int lastRulePosition = -1;
+ LoadRule baseRule = null;
+
+ for (Interval interval : query.getIntervals()) {
+ int currRulePosition = 0;
+ for (Rule rule : rules) {
+ if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
+ lastRulePosition = currRulePosition;
+ baseRule = (LoadRule) rule;
+ break;
+ }
+ currRulePosition++;
+ }
+ }
+
+ if (baseRule == null) {
+ return null;
+ }
+
+ // in the baseRule, find the broker of highest priority
+ for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
+ if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
+ brokerServiceName = entry.getValue();
break;
}
- currRulePosition++;
- }
- }
-
- if (baseRule == null) {
- return null;
- }
-
- // in the baseRule, find the broker of highest priority
- String brokerServiceName = null;
- for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
- if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
- brokerServiceName = entry.getValue();
- break;
}
}
diff --git a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java
index b87cb15..bb73c5d 100644
--- a/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java
+++ b/server/src/test/java/io/druid/client/CachePopulatingQueryRunnerTest.java
@@ -123,7 +123,7 @@
Sequence res = runner.run(builder.build());
// base sequence is not closed yet
- Assert.assertTrue(closable.isClosed());
+ Assert.assertFalse("sequence must not be closed", closable.isClosed());
ArrayList results = Sequences.toList(res, new ArrayList());
Assert.assertTrue(closable.isClosed());
Assert.assertEquals(expectedRes, results);
diff --git a/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java
similarity index 90%
rename from server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java
rename to server/src/test/java/io/druid/server/router/QueryHostFinderTest.java
index d788daa..e8bf260 100644
--- a/server/src/test/java/io/druid/server/router/TierAwareQueryRunnerTest.java
+++ b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java
@@ -40,20 +40,20 @@
/**
*/
-public class TierAwareQueryRunnerTest
+public class QueryHostFinderTest
{
private ServerDiscoverySelector selector;
- private BrokerSelector brokerSelector;
- private TierConfig config;
+ private TieredBrokerHostSelector brokerSelector;
+ private TieredBrokerConfig config;
private Server server;
@Before
public void setUp() throws Exception
{
selector = EasyMock.createMock(ServerDiscoverySelector.class);
- brokerSelector = EasyMock.createMock(BrokerSelector.class);
+ brokerSelector = EasyMock.createMock(TieredBrokerHostSelector.class);
- config = new TierConfig()
+ config = new TieredBrokerConfig()
{
@Override
public LinkedHashMap<String, String> getTierToBrokerMap()
@@ -118,12 +118,8 @@
EasyMock.expect(selector.pick()).andReturn(server).once();
EasyMock.replay(selector);
- TierAwareQueryRunner queryRunner = new TierAwareQueryRunner(
- null,
- null,
- null,
- brokerSelector,
- config
+ QueryHostFinder queryRunner = new QueryHostFinder(
+ brokerSelector
);
Server server = queryRunner.findServer(
diff --git a/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java
similarity index 84%
rename from server/src/test/java/io/druid/server/router/BrokerSelectorTest.java
rename to server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java
index 55e5dda..b2a22af 100644
--- a/server/src/test/java/io/druid/server/router/BrokerSelectorTest.java
+++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java
@@ -50,11 +50,11 @@
/**
*/
-public class BrokerSelectorTest
+public class TieredBrokerHostSelectorTest
{
private ServerDiscoveryFactory factory;
private ServerDiscoverySelector selector;
- private BrokerSelector brokerSelector;
+ private TieredBrokerHostSelector brokerSelector;
@Before
public void setUp() throws Exception
@@ -62,9 +62,9 @@
factory = EasyMock.createMock(ServerDiscoveryFactory.class);
selector = EasyMock.createMock(ServerDiscoverySelector.class);
- brokerSelector = new BrokerSelector(
+ brokerSelector = new TieredBrokerHostSelector(
new TestRuleManager(null, null, null, null),
- new TierConfig()
+ new TieredBrokerConfig()
{
@Override
public LinkedHashMap<String, String> getTierToBrokerMap()
@@ -112,11 +112,12 @@
public void testBasicSelect() throws Exception
{
String brokerName = (String) brokerSelector.select(
- new TimeBoundaryQuery(
- new TableDataSource("test"),
- new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01"))),
- null
- )
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .granularity("all")
+ .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
+ .intervals(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01")))
+ .build()
).lhs;
Assert.assertEquals("coldBroker", brokerName);
@@ -127,11 +128,12 @@
public void testBasicSelect2() throws Exception
{
String brokerName = (String) brokerSelector.select(
- new TimeBoundaryQuery(
- new TableDataSource("test"),
- new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01"))),
- null
- )
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .granularity("all")
+ .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
+ .intervals(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01")))
+ .build()
).lhs;
Assert.assertEquals("hotBroker", brokerName);
@@ -141,11 +143,12 @@
public void testSelectMatchesNothing() throws Exception
{
Pair retVal = brokerSelector.select(
- new TimeBoundaryQuery(
- new TableDataSource("test"),
- new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01"))),
- null
- )
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .granularity("all")
+ .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
+ .intervals(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01")))
+ .build()
);
Assert.assertEquals(null, retVal);
@@ -199,7 +202,7 @@
public TestRuleManager(
@Global HttpClient httpClient,
@Json ObjectMapper jsonMapper,
- Supplier<TierConfig> config,
+ Supplier<TieredBrokerConfig> config,
ServerDiscoverySelector selector
)
{
diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java
index 740edd8..1d0d100 100644
--- a/services/src/main/java/io/druid/cli/CliRouter.java
+++ b/services/src/main/java/io/druid/cli/CliRouter.java
@@ -25,24 +25,20 @@
import com.google.inject.Provides;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
+import io.druid.client.RoutingDruidClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
-import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
-import io.druid.query.MapQueryToolChestWarehouse;
-import io.druid.query.QuerySegmentWalker;
-import io.druid.query.QueryToolChestWarehouse;
-import io.druid.server.QueryResource;
import io.druid.server.initialization.JettyServerInitializer;
-import io.druid.server.router.BrokerSelector;
import io.druid.server.router.CoordinatorRuleManager;
-import io.druid.server.router.RouterQuerySegmentWalker;
-import io.druid.server.router.TierConfig;
+import io.druid.server.router.QueryHostFinder;
+import io.druid.server.router.TieredBrokerConfig;
+import io.druid.server.router.TieredBrokerHostSelector;
import org.eclipse.jetty.server.Server;
import java.util.List;
@@ -71,19 +67,16 @@
@Override
public void configure(Binder binder)
{
- JsonConfigProvider.bind(binder, "druid.router", TierConfig.class);
+ JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class);
binder.bind(CoordinatorRuleManager.class);
LifecycleModule.register(binder, CoordinatorRuleManager.class);
- binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
+ binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class);
+ binder.bind(QueryHostFinder.class).in(LazySingleton.class);
+ binder.bind(RoutingDruidClient.class).in(LazySingleton.class);
- binder.bind(BrokerSelector.class).in(ManageLifecycle.class);
- binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class);
-
- binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
- Jerseys.addResource(binder, QueryResource.class);
- LifecycleModule.register(binder, QueryResource.class);
+ binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);
LifecycleModule.register(binder, Server.class);
DiscoveryModule.register(binder, Self.class);
@@ -92,7 +85,7 @@
@Provides
@ManageLifecycle
public ServerDiscoverySelector getCoordinatorServerDiscoverySelector(
- TierConfig config,
+ TieredBrokerConfig config,
ServerDiscoveryFactory factory
)
diff --git a/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java b/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java
index f3be30e..34039e2 100644
--- a/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java
+++ b/services/src/main/java/io/druid/cli/QueryJettyServerInitializer.java
@@ -38,16 +38,13 @@
@Override
public void initialize(Server server, Injector injector)
{
- final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
- queries.setResourceBase("/");
-
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
- handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
+ handlerList.setHandlers(new Handler[]{root, new DefaultHandler()});
server.setHandler(handlerList);
}
}
diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java
new file mode 100644
index 0000000..8ed1849
--- /dev/null
+++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java
@@ -0,0 +1,104 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.cli;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.servlet.GuiceFilter;
+import com.metamx.emitter.service.ServiceEmitter;
+import io.druid.client.RoutingDruidClient;
+import io.druid.guice.annotations.Json;
+import io.druid.guice.annotations.Smile;
+import io.druid.server.AsyncQueryForwardingServlet;
+import io.druid.server.QueryIDProvider;
+import io.druid.server.initialization.JettyServerInitializer;
+import io.druid.server.log.RequestLogger;
+import io.druid.server.router.QueryHostFinder;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.DefaultHandler;
+import org.eclipse.jetty.server.handler.HandlerList;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.GzipFilter;
+
+/**
+ */
+public class RouterJettyServerInitializer implements JettyServerInitializer
+{
+ private final ObjectMapper jsonMapper;
+ private final ObjectMapper smileMapper;
+ private final QueryHostFinder hostFinder;
+ private final RoutingDruidClient routingDruidClient;
+ private final ServiceEmitter emitter;
+ private final RequestLogger requestLogger;
+ private final QueryIDProvider idProvider;
+
+ @Inject
+ public RouterJettyServerInitializer(
+ @Json ObjectMapper jsonMapper,
+ @Smile ObjectMapper smileMapper,
+ QueryHostFinder hostFinder,
+ RoutingDruidClient routingDruidClient,
+ ServiceEmitter emitter,
+ RequestLogger requestLogger,
+ QueryIDProvider idProvider
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.smileMapper = smileMapper;
+ this.hostFinder = hostFinder;
+ this.routingDruidClient = routingDruidClient;
+ this.emitter = emitter;
+ this.requestLogger = requestLogger;
+ this.idProvider = idProvider;
+ }
+
+ @Override
+ public void initialize(Server server, Injector injector)
+ {
+ final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ queries.addServlet(
+ new ServletHolder(
+ new AsyncQueryForwardingServlet(
+ jsonMapper,
+ smileMapper,
+ hostFinder,
+ routingDruidClient,
+ emitter,
+ requestLogger,
+ idProvider
+ )
+ ), "/druid/v2/*"
+ );
+ queries.addFilter(GzipFilter.class, "/druid/v2/*", null);
+
+ final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
+ root.addFilter(GzipFilter.class, "/*", null);
+ root.addFilter(GuiceFilter.class, "/*", null);
+
+ final HandlerList handlerList = new HandlerList();
+ handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
+ server.setHandler(handlerList);
+ }
+}