blob: e98c95f8505cb33c544a8a6be3fd469bd74a89ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.IndexingServiceTaskLogsModule;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
import org.apache.druid.indexing.worker.WorkerTaskManager;
import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.http.TaskManagementResource;
import org.apache.druid.indexing.worker.http.WorkerResource;
import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.lookup.LookupSerdeModule;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;
import java.util.List;
import java.util.Properties;
/**
*
*/
@Command(
name = "middleManager",
description = "Runs a Middle Manager, this is a \"task\" node used as part of the remote indexing service, see https://druid.apache.org/docs/latest/design/middlemanager.html for a description"
)
public class CliMiddleManager extends ServerRunnable
{
private static final Logger log = new Logger(CliMiddleManager.class);
private boolean isZkEnabled = true;
public CliMiddleManager()
{
super(log);
}
@Inject
public void configure(Properties properties)
{
isZkEnabled = ZkEnablementConfig.isEnabled(properties);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/middlemanager");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8091);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8291);
binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true);
IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class);
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>() {})
.toProvider(Providers.of(null));
binder.bind(ShuffleClient.class).toProvider(Providers.of(null));
binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(new NoopChatHandlerProvider()));
PolyBind.createChoice(
binder,
"druid.indexer.task.rowIngestionMeters.type",
Key.get(RowIngestionMetersFactory.class),
Key.get(DropwizardRowIngestionMetersFactory.class)
);
final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder =
PolyBind.optionBinder(binder, Key.get(RowIngestionMetersFactory.class));
rowIngestionMetersHandlerProviderBinder
.addBinding("dropwizard")
.to(DropwizardRowIngestionMetersFactory.class)
.in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
bindWorkerManagementClasses(binder, isZkEnabled);
binder.bind(JettyServerInitializer.class)
.to(MiddleManagerJettyServerInitializer.class)
.in(LazySingleton.class);
binder.bind(AppenderatorsManager.class)
.to(DummyForInjectionAppenderatorsManager.class)
.in(LazySingleton.class);
LifecycleModule.register(binder, Server.class);
bindNodeRoleAndAnnouncer(
binder,
DiscoverySideEffectsProvider
.builder(NodeRole.MIDDLE_MANAGER)
.serviceClasses(ImmutableList.of(WorkerNodeService.class))
.build()
);
Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
configureIntermediaryData(binder);
}
private void configureIntermediaryData(Binder binder)
{
PolyBind.createChoice(
binder,
"druid.processing.intermediaryData.storage.type",
Key.get(IntermediaryDataManager.class),
Key.get(LocalIntermediaryDataManager.class)
);
final MapBinder<String, IntermediaryDataManager> biddy = PolyBind.optionBinder(
binder,
Key.get(IntermediaryDataManager.class)
);
biddy.addBinding("local").to(LocalIntermediaryDataManager.class);
biddy.addBinding("deepstore").to(DeepStorageIntermediaryDataManager.class).in(LazySingleton.class);
}
@Provides
@LazySingleton
public Worker getWorker(@Self DruidNode node, WorkerConfig config)
{
return new Worker(
node.getServiceScheme(),
node.getHostAndPortToUse(),
config.getIp(),
config.getCapacity(),
config.getVersion(),
config.getCategory()
);
}
@Provides
@LazySingleton
public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig)
{
return new WorkerNodeService(
workerConfig.getIp(),
workerConfig.getCapacity(),
workerConfig.getVersion(),
workerConfig.getCategory()
);
}
},
new ShuffleModule(),
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(),
new IndexingServiceTaskLogsModule(),
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
new LookupSerdeModule()
);
}
public static void bindWorkerManagementClasses(Binder binder, boolean isZkEnabled)
{
if (isZkEnabled) {
binder.bind(WorkerTaskManager.class).to(WorkerTaskMonitor.class);
binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, WorkerTaskMonitor.class);
} else {
binder.bind(WorkerTaskManager.class).in(ManageLifecycle.class);
}
Jerseys.addResource(binder, WorkerResource.class);
Jerseys.addResource(binder, TaskManagementResource.class);
}
}