blob: 021ad751ad6754cf97a95b905c3af5d4be214bce [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.concurrent.ExecutorServices;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.coordination.ServerManager;
import com.metamx.druid.coordination.ZkCoordinator;
import com.metamx.druid.coordination.ZkCoordinatorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.StorageAdapterLoader;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.smile.SmileFactory;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
/**
*/
public class ComputeNode extends BaseServerNode<ComputeNode>
{
private static final Logger log = new Logger(ComputeNode.class);
public static Builder builder()
{
return new Builder();
}
private DruidServer druidServer;
private StorageAdapterLoader adapterLoader;
public ComputeNode(
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public ComputeNode setAdapterLoader(StorageAdapterLoader storageAdapterLoader)
{
Preconditions.checkState(this.adapterLoader == null, "Cannot set adapterLoader once it has already been set.");
this.adapterLoader = storageAdapterLoader;
return this;
}
public ComputeNode setDruidServer(DruidServer druidServer)
{
Preconditions.checkState(this.druidServer == null, "Cannot set druidServer once it has already been set.");
this.druidServer = druidServer;
return this;
}
public DruidServer getDruidServer()
{
initializeDruidServer();
return druidServer;
}
public StorageAdapterLoader getAdapterLoader()
{
initializeAdapterLoader();
return adapterLoader;
}
protected void doInit() throws Exception
{
initializeDruidServer();
initializeAdapterLoader();
final Lifecycle lifecycle = getLifecycle();
final ServiceEmitter emitter = getEmitter();
final List<Monitor> monitors = getMonitors();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final ExecutorService executorService = ExecutorServices.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
);
ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService);
final ZkCoordinator coordinator = new ZkCoordinator(
getJsonMapper(),
getConfigFactory().build(ZkCoordinatorConfig.class),
druidServer,
getPhoneBook(),
serverManager,
emitter
);
lifecycle.addManagedInstance(coordinator);
monitors.add(new ServerMonitor(getDruidServer(), serverManager));
startMonitoring(monitors);
final Context root = new Context(getServer(), "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), serverManager, emitter, getRequestLogger())
),
"/*"
);
}
private void initializeAdapterLoader()
{
if (adapterLoader == null) {
final Properties props = getProps();
try {
final RestS3Service s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
setAdapterLoader(
ServerInit.makeDefaultQueryableLoader(s3Client, getConfigFactory().build(QueryableLoaderConfig.class))
);
}
catch (S3ServiceException e) {
throw Throwables.propagate(e);
}
}
}
private void initializeDruidServer()
{
if (druidServer == null) {
setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class), "historical"));
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
return this;
}
public Builder withProps(Properties props)
{
this.props = props;
return this;
}
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
{
this.configFactory = configFactory;
return this;
}
public ComputeNode build()
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
}
else if (jsonMapper == null || smileMapper == null) {
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new ComputeNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}
}