blob: 92b55f97fce6494beae4b47c2b3adb8fb304af1e [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.QueryableNode;
import com.metamx.druid.client.BrokerServerView;
import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.cache.Cache;
import com.metamx.druid.client.cache.CacheConfig;
import com.metamx.druid.client.cache.CacheMonitor;
import com.metamx.druid.client.cache.MapCache;
import com.metamx.druid.client.cache.MapCacheConfig;
import com.metamx.druid.client.cache.MemcachedCache;
import com.metamx.druid.client.cache.MemcachedCacheConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.query.QueryToolChestWarehouse;
import com.metamx.druid.query.ReflectionQueryToolChestWarehouse;
import com.metamx.druid.utils.PropUtils;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.Monitor;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
import java.util.Properties;
/**
*/
public class BrokerNode extends QueryableNode<BrokerNode>
{
private static final Logger log = new Logger(BrokerNode.class);
public static final String CACHE_TYPE_LOCAL = "local";
public static final String CACHE_TYPE_MEMCACHED = "memcached";
public static final String CACHE_PROPERTY_PREFIX = "druid.bard.cache";
private final List<Module> extraModules = Lists.newArrayList();
private final List<String> pathsForGuiceFilter = Lists.newArrayList();
private QueryToolChestWarehouse warehouse = null;
private HttpClient brokerHttpClient = null;
private Cache cache = null;
private boolean useDiscovery = true;
public static Builder builder()
{
return new Builder();
}
public BrokerNode(
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public QueryToolChestWarehouse getWarehouse()
{
initializeWarehouse();
return warehouse;
}
public BrokerNode setWarehouse(QueryToolChestWarehouse warehouse)
{
checkFieldNotSetAndSet("warehouse", warehouse);
return this;
}
public HttpClient getBrokerHttpClient()
{
initializeBrokerHttpClient();
return brokerHttpClient;
}
public BrokerNode setBrokerHttpClient(HttpClient brokerHttpClient)
{
checkFieldNotSetAndSet("brokerHttpClient", brokerHttpClient);
return this;
}
public Cache getCache()
{
initializeCacheBroker();
return cache;
}
public BrokerNode setCache(Cache cache)
{
checkFieldNotSetAndSet("cache", cache);
return this;
}
public BrokerNode useDiscovery(boolean useDiscovery)
{
this.useDiscovery = useDiscovery;
return this;
}
/**
* This method allows you to specify more Guice modules to use primarily for injected extra Jersey resources.
* I'd like to remove the Guice dependency for this, but I don't know how to set up Jersey without Guice...
*
* This is deprecated because at some point in the future, we will eliminate the Guice dependency and anything
* that uses this will break. Use at your own risk.
*
* @param module the module to register with Guice
*
* @return this
*/
@Deprecated
public BrokerNode addModule(Module module)
{
extraModules.add(module);
return this;
}
/**
* This method is used to specify extra paths that the GuiceFilter should pay attention to.
*
* This is deprecated for the same reason that addModule is deprecated.
*
* @param path the path that the GuiceFilter should pay attention to.
*
* @return this
*/
@Deprecated
public BrokerNode addPathForGuiceFilter(String path)
{
pathsForGuiceFilter.add(path);
return this;
}
@Override
protected void doInit() throws Exception
{
initializeWarehouse();
initializeBrokerHttpClient();
initializeCacheBroker();
initializeDiscovery();
final Lifecycle lifecycle = getLifecycle();
final List<Monitor> monitors = getMonitors();
monitors.add(new CacheMonitor(cache));
startMonitoring(monitors);
final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient);
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
getConfigFactory().build(ClientConfig.class), getPhoneBook(), view
);
lifecycle.addManagedInstance(clientInventoryManager);
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cache, getSmileMapper());
lifecycle.addManagedInstance(baseClient);
final ClientQuerySegmentWalker texasRanger = new ClientQuerySegmentWalker(warehouse, getEmitter(), baseClient);
List<Module> theModules = Lists.newArrayList();
theModules.add(new ClientServletModule(texasRanger, clientInventoryManager, getJsonMapper()));
theModules.addAll(extraModules);
final Injector injector = Guice.createInjector(theModules);
final Context root = new Context(getServer(), "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(new QueryServlet(getJsonMapper(), getSmileMapper(), texasRanger, getEmitter(), getRequestLogger())),
"/druid/v2/*"
);
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", 0);
for (String path : pathsForGuiceFilter) {
root.addFilter(GuiceFilter.class, path, 0);
}
}
private void initializeDiscovery() throws Exception
{
if (useDiscovery) {
final Lifecycle lifecycle = getLifecycle();
final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
serviceDiscoveryConfig, lifecycle
);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, serviceDiscoveryConfig, lifecycle
);
}
}
private void initializeCacheBroker()
{
if (cache == null) {
String cacheType = getConfigFactory()
.build(CacheConfig.class)
.getType();
if (cacheType.equals(CACHE_TYPE_LOCAL)) {
setCache(
MapCache.create(
getConfigFactory().buildWithReplacements(
MapCacheConfig.class,
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
)
)
);
} else if (cacheType.equals(CACHE_TYPE_MEMCACHED)) {
setCache(
MemcachedCache.create(
getConfigFactory().buildWithReplacements(
MemcachedCacheConfig.class,
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
)
)
);
} else {
throw new ISE("Unknown cache type [%s]", cacheType);
}
}
}
private void initializeBrokerHttpClient()
{
if (brokerHttpClient == null) {
setBrokerHttpClient(
HttpClientInit.createClient(
HttpClientConfig
.builder()
.withNumConnections(PropUtils.getPropertyAsInt(getProps(), "druid.client.http.connections"))
.build(),
getLifecycle()
)
);
}
}
private void initializeWarehouse()
{
if (warehouse == null) {
setWarehouse(new ReflectionQueryToolChestWarehouse());
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
return this;
}
public Builder withProps(Properties props)
{
this.props = props;
return this;
}
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
{
this.configFactory = configFactory;
return this;
}
public BrokerNode build()
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
}
else if (jsonMapper == null || smileMapper == null) {
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new BrokerNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}
}