blob: 6212cb8b9b7896600d9e6ff62998feca29059c4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import io.netty.util.SuppressForbidden;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceTaskLogsModule;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ManageLifecycleServer;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.QueryablePeonModule;
import org.apache.druid.guice.SegmentWranglerModule;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.guice.ServerViewModule;
import org.apache.druid.guice.annotations.AttemptId;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.DeepStorageShuffleClient;
import org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProviderImpl;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.handoff.CoordinatorBasedSegmentHandoffNotifierConfig;
import org.apache.druid.segment.handoff.CoordinatorBasedSegmentHandoffNotifierFactory;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.apache.druid.segment.loading.OmniDataSegmentMover;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.PeonAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
import org.apache.druid.tasklogs.TaskPayloadManager;
import org.eclipse.jetty.server.Server;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
*/
@Command(
name = "peon",
description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. "
+ "This should rarely, if ever, be used directly. "
+ "See https://druid.apache.org/docs/latest/design/peons.html for a description"
)
public class CliPeon extends GuiceRunnable
{
@SuppressWarnings("WeakerAccess")
@Required
@Arguments(description = "taskDirPath attemptId")
public List<String> taskAndStatusFile;
// path to the task Directory
private String taskDirPath;
// the attemptId
private String attemptId;
/**
* Still using --nodeType as the flag for backward compatibility, although the concept is now more precisely called
* "serverType".
*/
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String serverType = "indexer-executor";
private boolean isZkEnabled = true;
/**
* If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for
* queryable tasks, such as streaming ingestion tasks.
*/
@Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments")
public String loadBroadcastSegments = "false";
@Option(name = "--taskId", title = "taskId", description = "TaskId for fetching task.json remotely")
public String taskId = "";
private static final Logger log = new Logger(CliPeon.class);
private Properties properties;
public CliPeon()
{
super(log);
}
@Inject
public void configure(Properties properties)
{
this.properties = properties;
isZkEnabled = ZkEnablementConfig.isEnabled(properties);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.of(
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new SegmentWranglerModule(),
new JoinableFactoryModule(),
new IndexingServiceTaskLogsModule(),
new Module()
{
@SuppressForbidden(reason = "System#out, System#err")
@Override
public void configure(Binder binder)
{
taskDirPath = taskAndStatusFile.get(0);
attemptId = taskAndStatusFile.get(1);
String serverViewType = (String) properties.getOrDefault(
ServerViewModule.SERVERVIEW_TYPE_PROPERTY,
ServerViewModule.DEFAULT_SERVERVIEW_TYPE
);
if (Boolean.parseBoolean(properties.getProperty(CliCoordinator.CENTRALIZED_DATASOURCE_SCHEMA_ENABLED))
&& !serverViewType.equals(ServerViewModule.SERVERVIEW_TYPE_HTTP)) {
throw DruidException
.forPersona(DruidException.Persona.ADMIN)
.ofCategory(DruidException.Category.UNSUPPORTED)
.build(
StringUtils.format(
"CentralizedDatasourceSchema feature is incompatible with config %1$s=%2$s. "
+ "Please consider switching to http based segment discovery (set %1$s=%3$s) "
+ "or disable the feature (set %4$s=false).",
ServerViewModule.SERVERVIEW_TYPE_PROPERTY,
serverViewType,
ServerViewModule.SERVERVIEW_TYPE_HTTP,
CliCoordinator.CENTRALIZED_DATASOURCE_SCHEMA_ENABLED
));
}
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/peon");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
binder.bind(ResponseContextConfig.class).toInstance(ResponseContextConfig.newConfig(true));
binder.bindConstant().annotatedWith(AttemptId.class).to(attemptId);
JsonConfigProvider.bind(binder, "druid.centralizedDatasourceSchema", CentralizedDatasourceSchemaConfig.class);
JsonConfigProvider.bind(binder, "druid.task.executor", DruidNode.class, Parent.class);
bindRowIngestionMeters(binder);
bindChatHandler(binder);
configureIntermediaryData(binder);
bindTaskConfigAndClients(binder);
bindPeonDataSegmentHandlers(binder);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, ExecutorLifecycle.class);
ExecutorLifecycleConfig executorLifecycleConfig = new ExecutorLifecycleConfig()
.setTaskFile(Paths.get(taskDirPath, "task.json").toFile())
.setStatusFile(Paths.get(taskDirPath, "attempt", attemptId, "status.json").toFile());
if (properties.getProperty("druid.indexer.runner.type", "").contains("k8s")) {
log.info("Running peon in k8s mode");
executorLifecycleConfig.setParentStreamDefined(false);
}
binder.bind(ExecutorLifecycleConfig.class).toInstance(executorLifecycleConfig);
binder.bind(TaskReportFileWriter.class)
.toInstance(
new SingleFileTaskReportFileWriter(
Paths.get(taskDirPath, "attempt", attemptId, "report.json").toFile()
));
binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
// Bind to ManageLifecycleServer to ensure SingleTaskBackgroundRunner is closed before
// its dependent services, such as DiscoveryServiceLocator and OverlordClient.
// This order ensures that tasks can finalize their cleanup operations before service location closure.
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycleServer.class);
bindRealtimeCache(binder);
bindCoordinatorHandoffNotifer(binder);
binder.bind(AppenderatorsManager.class)
.to(PeonAppenderatorsManager.class)
.in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, SegmentListerResource.class);
binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType)));
LifecycleModule.register(binder, Server.class);
if ("true".equals(loadBroadcastSegments)) {
binder.install(new BroadcastSegmentLoadingModule());
}
}
@Provides
@LazySingleton
@Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING)
public Supplier<Map<String, Object>> heartbeatDimensions(Task task)
{
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.put(DruidMetrics.TASK_ID, task.getId());
builder.put(DruidMetrics.DATASOURCE, task.getDataSource());
builder.put(DruidMetrics.TASK_TYPE, task.getType());
builder.put(DruidMetrics.GROUP_ID, task.getGroupId());
Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
if (tags != null && !tags.isEmpty()) {
builder.put(DruidMetrics.TAGS, tags);
}
return Suppliers.ofInstance(
builder.build()
);
}
@Provides
@LazySingleton
public Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config, TaskPayloadManager taskPayloadManager)
{
try {
if (!config.getTaskFile().exists() || config.getTaskFile().length() == 0) {
log.info("Task file not found, trying to pull task payload from deep storage");
String task = IOUtils.toString(taskPayloadManager.streamTaskPayload(taskId).get(), Charset.defaultCharset());
// write the remote task.json to the task file location for ExecutorLifecycle to pickup
FileUtils.write(config.getTaskFile(), task, Charset.defaultCharset());
}
return mapper.readValue(config.getTaskFile(), Task.class);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Provides
@LazySingleton
@Named(DataSourceTaskIdHolder.DATA_SOURCE_BINDING)
public String getDataSourceFromTask(final Task task)
{
return task.getDataSource();
}
@Provides
@LazySingleton
@Named(DataSourceTaskIdHolder.TASK_ID_BINDING)
public String getTaskIDFromTask(final Task task)
{
return task.getId();
}
},
new QueryablePeonModule(),
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(),
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
new ChatHandlerServerModule(properties),
new LookupModule()
);
}
@SuppressForbidden(reason = "System#out, System#err")
@Override
public void run()
{
try {
Injector injector = makeInjector(ImmutableSet.of(NodeRole.PEON));
try {
final Lifecycle lifecycle = initLifecycle(injector);
final Thread hook = new Thread(
() -> {
log.info("Running shutdown hook");
lifecycle.stop();
}
);
Runtime.getRuntime().addShutdownHook(hook);
injector.getInstance(ExecutorLifecycle.class).join();
// Sanity check to help debug unexpected non-daemon threads
final Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread thread : threadSet) {
if (!thread.isDaemon() && thread != Thread.currentThread()) {
log.info("Thread [%s] is non daemon.", thread);
}
}
// Explicitly call lifecycle stop, dont rely on shutdown hook.
lifecycle.stop();
try {
Runtime.getRuntime().removeShutdownHook(hook);
}
catch (IllegalStateException e) {
System.err.println("Cannot remove shutdown hook, already shutting down!");
}
}
catch (Throwable t) {
System.err.println("Error!");
System.err.println(Throwables.getStackTraceAsString(t));
System.exit(1);
}
System.out.println("Finished peon task");
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
static void bindRowIngestionMeters(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.indexer.task.rowIngestionMeters.type",
Key.get(RowIngestionMetersFactory.class),
Key.get(DropwizardRowIngestionMetersFactory.class)
);
final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder =
PolyBind.optionBinder(binder, Key.get(RowIngestionMetersFactory.class));
rowIngestionMetersHandlerProviderBinder
.addBinding("dropwizard")
.to(DropwizardRowIngestionMetersFactory.class)
.in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
}
static void bindChatHandler(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.indexer.task.chathandler.type",
Key.get(ChatHandlerProvider.class),
Key.get(ServiceAnnouncingChatHandlerProvider.class)
);
final MapBinder<String, ChatHandlerProvider> handlerProviderBinder =
PolyBind.optionBinder(binder, Key.get(ChatHandlerProvider.class));
handlerProviderBinder
.addBinding("announce")
.to(ServiceAnnouncingChatHandlerProvider.class)
.in(LazySingleton.class);
handlerProviderBinder
.addBinding("noop")
.to(NoopChatHandlerProvider.class)
.in(LazySingleton.class);
binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
}
static void bindPeonDataSegmentHandlers(Binder binder)
{
// Build it to make it bind even if nothing binds to it.
Binders.dataSegmentKillerBinder(binder);
binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder);
binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder);
binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
}
private static void configureTaskActionClient(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.peon.mode",
Key.get(TaskActionClientFactory.class),
Key.get(RemoteTaskActionClientFactory.class)
);
final MapBinder<String, TaskActionClientFactory> taskActionBinder =
PolyBind.optionBinder(binder, Key.get(TaskActionClientFactory.class));
taskActionBinder
.addBinding("local")
.to(LocalTaskActionClientFactory.class)
.in(LazySingleton.class);
// all of these bindings are so that we can run the peon in local mode
JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(IndexerMetadataStorageCoordinator.class)
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
taskActionBinder
.addBinding("remote")
.to(RemoteTaskActionClientFactory.class)
.in(LazySingleton.class);
binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.PEON);
}
static void bindTaskConfigAndClients(Binder binder)
{
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class);
configureTaskActionClient(binder);
binder.bind(ParallelIndexSupervisorTaskClientProvider.class)
.to(ParallelIndexSupervisorTaskClientProviderImpl.class)
.in(LazySingleton.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
}
static void bindRealtimeCache(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());
}
static void bindCoordinatorHandoffNotifer(Binder binder)
{
JsonConfigProvider.bind(
binder,
"druid.segment.handoff",
CoordinatorBasedSegmentHandoffNotifierConfig.class
);
binder.bind(SegmentHandoffNotifierFactory.class)
.to(CoordinatorBasedSegmentHandoffNotifierFactory.class)
.in(LazySingleton.class);
}
static void configureIntermediaryData(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.processing.intermediaryData.storage.type",
Key.get(IntermediaryDataManager.class),
Key.get(LocalIntermediaryDataManager.class)
);
final MapBinder<String, IntermediaryDataManager> intermediaryDataManagerBiddy = PolyBind.optionBinder(
binder,
Key.get(IntermediaryDataManager.class)
);
intermediaryDataManagerBiddy.addBinding("local").to(LocalIntermediaryDataManager.class).in(LazySingleton.class);
intermediaryDataManagerBiddy.addBinding("deepstore").to(DeepStorageIntermediaryDataManager.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.processing.intermediaryData.storage.type",
Key.get(ShuffleClient.class),
Key.get(HttpShuffleClient.class)
);
final MapBinder<String, ShuffleClient> shuffleClientBiddy = PolyBind.optionBinder(
binder,
Key.get(ShuffleClient.class)
);
shuffleClientBiddy.addBinding("local").to(HttpShuffleClient.class).in(LazySingleton.class);
shuffleClientBiddy.addBinding("deepstore").to(DeepStorageShuffleClient.class).in(LazySingleton.class);
}
public class BroadcastSegmentLoadingModule implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
Jerseys.addResource(binder, HistoricalResource.class);
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
}
@Provides
@LazySingleton
public List<StorageLocation> getCliPeonStorageLocations(TaskConfig config)
{
File broadcastStorage = new File(new File(taskDirPath, "broadcast"), "segments");
return ImmutableList.of(new StorageLocation(broadcastStorage, config.getTmpStorageBytesPerTask(), null));
}
}
}