blob: 5452acac16309e67c3667b48829ecb7ea347b0ba [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.http;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.MasterMain;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.DbTaskStorage;
import com.metamx.druid.merger.coordinator.HeapMemoryTaskStorage;
import com.metamx.druid.merger.coordinator.LocalTaskRunner;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
import com.metamx.druid.merger.coordinator.RetryPolicyFactory;
import com.metamx.druid.merger.coordinator.TaskLockbox;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.druid.merger.coordinator.TaskRunnerFactory;
import com.metamx.druid.merger.coordinator.TaskStorage;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.NoopAutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerConfig;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.FilterHolder;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.resource.ResourceCollection;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class IndexerCoordinatorNode extends RegisteringNode
{
private static final Logger log = new Logger(IndexerCoordinatorNode.class);
public static Builder builder()
{
return new Builder();
}
private final ObjectMapper jsonMapper;
private final Lifecycle lifecycle;
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private List<Monitor> monitors = null;
private ServiceEmitter emitter = null;
private DbConnectorConfig dbConnectorConfig = null;
private DBI dbi = null;
private RestS3Service s3Service = null;
private IndexerCoordinatorConfig config = null;
private TaskConfig taskConfig = null;
private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null;
private MergerDBCoordinator mergerDBCoordinator = null;
private TaskStorage taskStorage = null;
private TaskQueue taskQueue = null;
private TaskLockbox taskLockbox = null;
private CuratorFramework curatorFramework = null;
private ScheduledExecutorFactory scheduledExecutorFactory = null;
private IndexerZkConfig indexerZkConfig;
private TaskRunnerFactory taskRunnerFactory = null;
private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null;
private TaskMasterLifecycle taskMasterLifecycle = null;
private Server server = null;
private boolean initialized = false;
public IndexerCoordinatorNode(
ObjectMapper jsonMapper,
Lifecycle lifecycle,
Properties props,
ConfigurationObjectFactory configFactory
)
{
super(Arrays.asList(jsonMapper));
this.jsonMapper = jsonMapper;
this.lifecycle = lifecycle;
this.props = props;
this.configFactory = configFactory;
}
public IndexerCoordinatorNode setEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
return this;
}
public IndexerCoordinatorNode setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator)
{
this.mergerDBCoordinator = mergerDBCoordinator;
return this;
}
public IndexerCoordinatorNode setTaskQueue(TaskQueue taskQueue)
{
this.taskQueue = taskQueue;
return this;
}
public IndexerCoordinatorNode setS3Service(RestS3Service s3Service)
{
this.s3Service = s3Service;
return this;
}
public IndexerCoordinatorNode setTaskLockbox(TaskLockbox taskLockbox)
{
this.taskLockbox = taskLockbox;
return this;
}
public IndexerCoordinatorNode setSegmentPusher(DataSegmentPusher segmentPusher)
{
this.segmentPusher = segmentPusher;
return this;
}
public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator)
{
this.mergerDBCoordinator = mergeDbCoordinator;
return this;
}
public IndexerCoordinatorNode setCuratorFramework(CuratorFramework curatorFramework)
{
this.curatorFramework = curatorFramework;
return this;
}
public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
{
this.taskRunnerFactory = taskRunnerFactory;
return this;
}
public IndexerCoordinatorNode setResourceManagementSchedulerFactory(ResourceManagementSchedulerFactory resourceManagementSchedulerFactory)
{
this.resourceManagementSchedulerFactory = resourceManagementSchedulerFactory;
return this;
}
public void init() throws Exception
{
scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ConfigManagerConfig managerConfig = configFactory.build(ConfigManagerConfig.class);
DbConnector.createConfigTable(dbi, managerConfig.getConfigTable());
JacksonConfigManager configManager = new JacksonConfigManager(new ConfigManager(dbi, managerConfig), jsonMapper);
initializeEmitter();
initializeMonitors();
initializeDB();
initializeIndexerCoordinatorConfig();
initializeTaskConfig();
initializeS3Service();
initializeMergeDBCoordinator();
initializeTaskStorage();
initializeTaskLockbox();
initializeTaskQueue();
initializeDataSegmentPusher();
initializeTaskToolbox();
initializeJacksonInjections();
initializeJacksonSubtypes();
initializeCurator();
initializeIndexerZkConfig();
initializeTaskRunnerFactory(configManager);
initializeResourceManagement(configManager);
initializeTaskMasterLifecycle();
initializeServer();
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class),
globalScheduledExec,
emitter,
monitors
);
lifecycle.addManagedInstance(monitorScheduler);
final Injector injector = Guice.createInjector(
new IndexerCoordinatorServletModule(
jsonMapper,
config,
emitter,
taskMasterLifecycle,
new TaskStorageQueryAdapter(taskStorage),
configManager
)
);
final Context staticContext = new Context(server, "/static", Context.SESSIONS);
staticContext.addServlet(new ServletHolder(new DefaultServlet()), "/*");
ResourceCollection resourceCollection = new ResourceCollection(new String[] {
IndexerCoordinatorNode.class.getClassLoader().getResource("static").toExternalForm(),
IndexerCoordinatorNode.class.getClassLoader().getResource("indexer_static").toExternalForm()
});
staticContext.setBaseResource(resourceCollection);
final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/mmx/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(
new FilterHolder(
new RedirectFilter(
new ToStringResponseHandler(Charsets.UTF_8),
new RedirectInfo()
{
@Override
public boolean doLocal()
{
return taskMasterLifecycle.isLeading();
}
@Override
public URL getRedirectURL(String queryString, String requestURI)
{
try {
return new URL(
String.format(
"http://%s%s",
taskMasterLifecycle.getLeader(),
requestURI
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
)
), "/*", 0
);
root.addFilter(GuiceFilter.class, "/mmx/merger/v1/*", 0);
initialized = true;
}
private void initializeTaskMasterLifecycle()
{
if (taskMasterLifecycle == null) {
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
taskMasterLifecycle = new TaskMasterLifecycle(
taskQueue,
taskToolboxFactory,
config,
serviceDiscoveryConfig,
taskRunnerFactory,
resourceManagementSchedulerFactory,
curatorFramework,
emitter
);
lifecycle.addManagedInstance(taskMasterLifecycle);
}
}
@LifecycleStart
public synchronized void start() throws Exception
{
if (!initialized) {
init();
}
lifecycle.start();
}
@LifecycleStop
public synchronized void stop()
{
lifecycle.stop();
}
private void initializeServer()
{
if (server == null) {
server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Jetty");
server.start();
}
@Override
public void stop()
{
log.info("Stopping Jetty");
try {
server.stop();
}
catch (Exception e) {
log.error(e, "Exception thrown while stopping Jetty");
}
}
}
);
}
}
private void initializeJacksonInjections()
{
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", segmentPusher);
jsonMapper.setInjectableValues(injectables);
}
private void initializeJacksonSubtypes()
{
jsonMapper.registerSubtypes(StaticS3FirehoseFactory.class);
}
private void initializeEmitter()
{
if (emitter == null) {
final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
);
emitter = new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, jsonMapper, lifecycle)
);
}
EmittingLogger.registerEmitter(emitter);
}
private void initializeMonitors()
{
if (monitors == null) {
monitors = Lists.newArrayList();
monitors.add(new JvmMonitor());
monitors.add(new SysMonitor());
}
}
private void initializeDB()
{
if (dbConnectorConfig == null) {
dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
}
if (dbi == null) {
dbi = new DbConnector(dbConnectorConfig).getDBI();
}
}
private void initializeIndexerCoordinatorConfig()
{
if (config == null) {
config = configFactory.build(IndexerCoordinatorConfig.class);
}
}
private void initializeTaskConfig()
{
if (taskConfig == null) {
taskConfig = configFactory.build(TaskConfig.class);
}
}
public void initializeS3Service() throws S3ServiceException
{
this.s3Service = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
public void initializeDataSegmentPusher()
{
if (segmentPusher == null) {
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, jsonMapper);
}
}
public void initializeTaskToolbox()
{
if (taskToolboxFactory == null) {
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter)
),
emitter,
s3Service,
segmentPusher,
dataSegmentKiller,
jsonMapper
);
}
}
public void initializeMergeDBCoordinator()
{
if (mergerDBCoordinator == null) {
mergerDBCoordinator = new MergerDBCoordinator(
jsonMapper,
dbConnectorConfig,
dbi
);
}
}
public void initializeTaskQueue()
{
if (taskQueue == null) {
// Don't start it here. The TaskMasterLifecycle will handle that when it feels like it.
taskQueue = new TaskQueue(taskStorage, taskLockbox);
}
}
public void initializeTaskLockbox()
{
if (taskLockbox == null) {
taskLockbox = new TaskLockbox(taskStorage);
}
}
public void initializeCurator() throws Exception
{
if (curatorFramework == null) {
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
serviceDiscoveryConfig,
lifecycle
);
}
}
public void initializeIndexerZkConfig()
{
if (indexerZkConfig == null) {
indexerZkConfig = configFactory.build(IndexerZkConfig.class);
}
}
public void initializeTaskStorage()
{
if (taskStorage == null) {
if (config.getStorageImpl().equals("local")) {
taskStorage = new HeapMemoryTaskStorage();
} else if (config.getStorageImpl().equals("db")) {
final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
taskStorage = new DbTaskStorage(jsonMapper, dbConnectorConfig, new DbConnector(dbConnectorConfig).getDBI());
} else {
throw new ISE("Invalid storage implementation: %s", config.getStorageImpl());
}
}
}
private void initializeTaskRunnerFactory(final JacksonConfigManager configManager)
{
if (taskRunnerFactory == null) {
if (config.getRunnerImpl().equals("remote")) {
taskRunnerFactory = new TaskRunnerFactory()
{
@Override
public TaskRunner build()
{
// Don't use scheduledExecutorFactory, since it's linked to the wrong lifecycle (global lifecycle instead
// of leadership lifecycle)
final ScheduledExecutorService retryScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("RemoteRunnerRetryExec--%d")
.build()
);
RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
configFactory.build(RemoteTaskRunnerConfig.class),
curatorFramework,
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true),
retryScheduledExec,
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class)
);
return remoteTaskRunner;
}
};
} else if (config.getRunnerImpl().equals("local")) {
taskRunnerFactory = new TaskRunnerFactory()
{
@Override
public TaskRunner build()
{
final ExecutorService runnerExec = Executors.newFixedThreadPool(config.getNumLocalThreads());
return new LocalTaskRunner(taskToolboxFactory, runnerExec);
}
};
} else {
throw new ISE("Invalid runner implementation: %s", config.getRunnerImpl());
}
}
}
private void initializeResourceManagement(final JacksonConfigManager configManager)
{
if (resourceManagementSchedulerFactory == null) {
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(TaskRunner runner)
{
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ScalingExec--%d")
.build()
);
final AtomicReference<WorkerSetupData> workerSetupData = configManager.watch(
WorkerSetupData.CONFIG_KEY, WorkerSetupData.class
);
AutoScalingStrategy strategy;
if (config.getStrategyImpl().equalsIgnoreCase("ec2")) {
strategy = new EC2AutoScalingStrategy(
jsonMapper,
new AmazonEC2Client(
new BasicAWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
),
configFactory.build(EC2AutoScalingStrategyConfig.class),
workerSetupData
);
} else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
strategy = new NoopAutoScalingStrategy();
} else {
throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl());
}
return new ResourceManagementScheduler(
runner,
new SimpleResourceManagementStrategy(
strategy,
configFactory.build(SimpleResourceManagmentConfig.class),
workerSetupData
),
configFactory.build(ResourceManagementSchedulerConfig.class),
scalingScheduledExec
);
}
};
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMapper(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
return this;
}
public Builder withLifecycle(Lifecycle lifecycle)
{
this.lifecycle = lifecycle;
return this;
}
public Builder withProps(Properties props)
{
this.props = props;
return this;
}
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
{
this.configFactory = configFactory;
return this;
}
public IndexerCoordinatorNode build()
{
if (jsonMapper == null) {
jsonMapper = new DefaultObjectMapper();
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new IndexerCoordinatorNode(jsonMapper, lifecycle, props, configFactory);
}
}
}