blob: 6f7cacda8b343f5e96a6072c2ae4fd5381d4991c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import io.airlift.airline.Arguments;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import io.netty.util.SuppressForbidden;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.QueryablePeonModule;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.apache.druid.segment.loading.OmniDataSegmentMover;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.PeonAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.eclipse.jetty.server.Server;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.Set;
/**
*/
@Command(
name = "peon",
description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. "
+ "This should rarely, if ever, be used directly. "
+ "See https://druid.apache.org/docs/latest/design/peons.html for a description"
)
public class CliPeon extends GuiceRunnable
{
@SuppressWarnings("WeakerAccess")
@Arguments(description = "task.json status.json report.json", required = true)
public List<String> taskAndStatusFile;
// path to store the task's stdout log
private String taskLogPath;
// path to store the task's TaskStatus
private String taskStatusPath;
// path to store the task's TaskReport objects
private String taskReportPath;
/**
* Still using --nodeType as the flag for backward compatibility, although the concept is now more precisely called
* "serverType".
*/
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String serverType = "indexer-executor";
/**
* If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for
* queryable tasks, such as streaming ingestion tasks.
*/
@Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments")
public String loadBroadcastSegments = "false";
private static final Logger log = new Logger(CliPeon.class);
@Inject
private Properties properties;
public CliPeon()
{
super(log);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.of(
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new JoinableFactoryModule(),
new Module()
{
@SuppressForbidden(reason = "System#out, System#err")
@Override
public void configure(Binder binder)
{
taskLogPath = taskAndStatusFile.get(0);
taskStatusPath = taskAndStatusFile.get(1);
taskReportPath = taskAndStatusFile.get(2);
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/peon");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
binder.bind(ResponseContextConfig.class).toInstance(ResponseContextConfig.newConfig(true));
JsonConfigProvider.bind(binder, "druid.task.executor", DruidNode.class, Parent.class);
bindRowIngestionMeters(binder);
bindChatHandler(binder);
bindTaskConfigAndClients(binder);
bindPeonDataSegmentHandlers(binder);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, ExecutorLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(
new ExecutorLifecycleConfig()
.setTaskFile(new File(taskLogPath))
.setStatusFile(new File(taskStatusPath))
);
binder.bind(TaskReportFileWriter.class)
.toInstance(new SingleFileTaskReportFileWriter(new File(taskReportPath)));
binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
bindRealtimeCache(binder);
bindCoordinatorHandoffNotiferAndClient(binder);
binder.bind(AppenderatorsManager.class)
.to(PeonAppenderatorsManager.class)
.in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, SegmentListerResource.class);
binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType)));
LifecycleModule.register(binder, Server.class);
if ("true".equals(loadBroadcastSegments)) {
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
Jerseys.addResource(binder, HistoricalResource.class);
LifecycleModule.register(binder, ZkCoordinator.class);
}
}
@Provides
@LazySingleton
public Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config)
{
try {
return mapper.readValue(config.getTaskFile(), Task.class);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Provides
@LazySingleton
@Named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)
public String getDataSourceFromTask(final Task task)
{
return task.getDataSource();
}
@Provides
@LazySingleton
@Named(DataSourceTaskIdHolder.TASK_ID_BINDING)
public String getTaskIDFromTask(final Task task)
{
return task.getId();
}
},
new QueryablePeonModule(),
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(),
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
new ChatHandlerServerModule(properties),
new LookupModule()
);
}
@SuppressForbidden(reason = "System#out, System#err")
@Override
public void run()
{
try {
Injector injector = makeInjector();
try {
final Lifecycle lifecycle = initLifecycle(injector);
final Thread hook = new Thread(
() -> {
log.info("Running shutdown hook");
lifecycle.stop();
}
);
Runtime.getRuntime().addShutdownHook(hook);
injector.getInstance(ExecutorLifecycle.class).join();
// Sanity check to help debug unexpected non-daemon threads
final Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread thread : threadSet) {
if (!thread.isDaemon() && thread != Thread.currentThread()) {
log.info("Thread [%s] is non daemon.", thread);
}
}
// Explicitly call lifecycle stop, dont rely on shutdown hook.
lifecycle.stop();
try {
Runtime.getRuntime().removeShutdownHook(hook);
}
catch (IllegalStateException e) {
System.err.println("Cannot remove shutdown hook, already shutting down!");
}
}
catch (Throwable t) {
System.err.println("Error!");
System.err.println(Throwables.getStackTraceAsString(t));
System.exit(1);
}
System.out.println("Finished peon task");
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
static void bindRowIngestionMeters(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.indexer.task.rowIngestionMeters.type",
Key.get(RowIngestionMetersFactory.class),
Key.get(DropwizardRowIngestionMetersFactory.class)
);
final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder =
PolyBind.optionBinder(binder, Key.get(RowIngestionMetersFactory.class));
rowIngestionMetersHandlerProviderBinder
.addBinding("dropwizard")
.to(DropwizardRowIngestionMetersFactory.class)
.in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
}
static void bindChatHandler(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.indexer.task.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(ServiceAnnouncingChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder =
PolyBind.optionBinder(binder, Key.get(ChatHandlerProvider.class));
handlerProviderBinder
.addBinding("announce")
.to(ServiceAnnouncingChatHandlerProvider.class)
.in(LazySingleton.class);
handlerProviderBinder
.addBinding("noop")
.to(NoopChatHandlerProvider.class)
.in(LazySingleton.class);
binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
}
static void bindPeonDataSegmentHandlers(Binder binder)
{
// Build it to make it bind even if nothing binds to it.
Binders.dataSegmentKillerBinder(binder);
binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder);
binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder);
binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
}
private static void configureTaskActionClient(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.peon.mode",
Key.get(TaskActionClientFactory.class),
Key.get(RemoteTaskActionClientFactory.class)
);
final MapBinder<String, TaskActionClientFactory> taskActionBinder =
PolyBind.optionBinder(binder, Key.get(TaskActionClientFactory.class));
taskActionBinder
.addBinding("local")
.to(LocalTaskActionClientFactory.class)
.in(LazySingleton.class);
// all of these bindings are so that we can run the peon in local mode
JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(IndexerMetadataStorageCoordinator.class)
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
taskActionBinder
.addBinding("remote")
.to(RemoteTaskActionClientFactory.class)
.in(LazySingleton.class);
binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.PEON);
}
static void bindTaskConfigAndClients(Binder binder)
{
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class);
configureTaskActionClient(binder);
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(ShuffleClient.class).to(HttpShuffleClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>(){})
.to(ParallelIndexTaskClientFactory.class)
.in(LazySingleton.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
}
static void bindRealtimeCache(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());
}
static void bindCoordinatorHandoffNotiferAndClient(Binder binder)
{
JsonConfigProvider.bind(
binder,
"druid.segment.handoff",
CoordinatorBasedSegmentHandoffNotifierConfig.class
);
binder.bind(SegmentHandoffNotifierFactory.class)
.to(CoordinatorBasedSegmentHandoffNotifierFactory.class)
.in(LazySingleton.class);
binder.bind(CoordinatorClient.class).in(LazySingleton.class);
}
}