blob: 21db6d76590a4e6eb9ad48d97d2801ced6e85a26 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service.modules.core;
import java.util.Objects;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.multibindings.OptionalBinder;
import com.google.inject.name.Names;
import com.typesafe.config.Config;
import javax.inject.Singleton;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
import org.apache.gobblin.service.FlowConfigsResource;
import org.apache.gobblin.service.FlowConfigsResourceHandler;
import org.apache.gobblin.service.FlowConfigsV2Resource;
import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
import org.apache.gobblin.service.FlowExecutionResource;
import org.apache.gobblin.service.FlowExecutionResourceHandler;
import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.service.FlowStatusResource;
import org.apache.gobblin.service.GroupOwnershipService;
import org.apache.gobblin.service.NoopRequesterService;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.modules.utils.InjectionNames;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
public class GobblinServiceGuiceModule implements Module {
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceGuiceModule.class);
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "jobStatusRetriever.class";
GobblinServiceConfiguration serviceConfig;
public GobblinServiceGuiceModule(GobblinServiceConfiguration serviceConfig) {
this.serviceConfig = Objects.requireNonNull(serviceConfig);
}
@Override
public void configure(Binder binder) {
LOGGER.info("Configuring bindings for the following service settings: {}", serviceConfig);
// In the current code base, we frequently inject classes instead of interfaces
// As a result, even when the binding is missing, Guice will create an instance of the
// the class and inject it. This interferes with disabling of different services and
// components, because without explicit bindings they will get instantiated anyway.
binder.requireExplicitBindings();
// Optional binder will find the existing binding for T and create additional binding for Optional<T>.
// If none of the specific class binding exist, optional will be "absent".
OptionalBinder.newOptionalBinder(binder, Logger.class);
binder.bind(Logger.class).toInstance(LoggerFactory.getLogger(GobblinServiceManager.class));
binder.bind(Config.class).toInstance(serviceConfig.getInnerConfig());
binder.bind(GobblinServiceConfiguration.class).toInstance(serviceConfig);
// Used by TopologyCatalog and FlowCatalog
GobblinInstanceEnvironment gobblinInstanceEnvironment = StandardGobblinInstanceLauncher.builder()
.withLog(LoggerFactory.getLogger(GobblinServiceManager.class))
.setInstrumentationEnabled(true)
.withSysConfig(serviceConfig.getInnerConfig())
.build();
binder.bind(GobblinInstanceEnvironment.class).toInstance(gobblinInstanceEnvironment);
binder.bind(EventBus.class)
.annotatedWith(Names.named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME))
.toInstance(new EventBus(GobblinServiceManager.class.getSimpleName()));
binder.bindConstant().annotatedWith(Names.named(InjectionNames.SERVICE_NAME)).to(serviceConfig.getServiceName());
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.FORCE_LEADER))
.to(ConfigUtils.getBoolean(serviceConfig.getInnerConfig(), ServiceConfigKeys.FORCE_LEADER,
ServiceConfigKeys.DEFAULT_FORCE_LEADER));
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT))
.to(serviceConfig.getServiceName());
binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandler.class);
binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
binder.bind(FlowConfigsResource.class);
binder.bind(FlowConfigsV2Resource.class);
binder.bind(FlowStatusResource.class);
binder.bind(FlowExecutionResource.class);
binder.bind(FlowConfigResourceLocalHandler.class);
binder.bind(FlowConfigV2ResourceLocalHandler.class);
binder.bind(FlowExecutionResourceLocalHandler.class);
binder.bindConstant().annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
binder.bind(RequesterService.class)
.to(NoopRequesterService.class);
OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
if (serviceConfig.isTopologyCatalogEnabled()) {
binder.bind(TopologyCatalog.class);
}
if (serviceConfig.isTopologySpecFactoryEnabled()) {
binder.bind(TopologySpecFactory.class)
.to(getClassByNameOrAlias(TopologySpecFactory.class, serviceConfig.getInnerConfig(),
ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY));
}
OptionalBinder.newOptionalBinder(binder, DagManager.class);
if (serviceConfig.isDagManagerEnabled()) {
binder.bind(DagManager.class);
}
OptionalBinder.newOptionalBinder(binder, HelixManager.class);
if (serviceConfig.isHelixManagerEnabled()) {
binder.bind(HelixManager.class)
.toInstance(buildHelixManager(serviceConfig.getInnerConfig(),
serviceConfig.getInnerConfig().getString(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY)));
} else {
LOGGER.info("No ZooKeeper connection string. Running in single instance mode.");
}
OptionalBinder.newOptionalBinder(binder, FlowCatalog.class);
if (serviceConfig.isFlowCatalogEnabled()) {
binder.bind(FlowCatalog.class);
}
if (serviceConfig.isJobStatusMonitorEnabled()) {
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
}
binder.bind(FlowStatusGenerator.class);
if (serviceConfig.isSchedulerEnabled()) {
binder.bind(Orchestrator.class);
binder.bind(SchedulerService.class);
binder.bind(GobblinServiceJobScheduler.class);
}
if (serviceConfig.isGitConfigMonitorEnabled()) {
binder.bind(GitConfigMonitor.class);
}
binder.bind(GroupOwnershipService.class)
.to(getClassByNameOrAlias(GroupOwnershipService.class, serviceConfig.getInnerConfig(),
ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS, ServiceConfigKeys.DEFAULT_GROUP_OWNERSHIP_SERVICE));
binder.bind(JobStatusRetriever.class)
.to(getClassByNameOrAlias(JobStatusRetriever.class, serviceConfig.getInnerConfig(),
JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
if (serviceConfig.isRestLIServerEnabled()) {
binder.bind(EmbeddedRestliServer.class).toProvider(EmbeddedRestliServerProvider.class).in(Singleton.class);
}
binder.bind(GobblinServiceManager.class);
binder.bind(MultiContextIssueRepository.class).to(InMemoryMultiContextIssueRepository.class);
binder.bind(JobIssueEventHandler.class);
binder.bind(D2Announcer.class).to(NoopD2Announcer.class);
LOGGER.info("Bindings configured");
}
protected HelixManager buildHelixManager(Config config, String zkConnectionString) {
String helixClusterName = config.getString(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY);
String helixInstanceName = HelixUtils.buildHelixInstanceName(config, GobblinServiceManager.class.getSimpleName());
LOGGER.info(
"Creating Helix cluster if not already present [overwrite = false]: " + zkConnectionString);
HelixUtils.createGobblinHelixCluster(zkConnectionString, helixClusterName, false);
return HelixUtils.buildHelixManager(helixInstanceName, helixClusterName, zkConnectionString);
}
protected static <T> Class<? extends T> getClassByNameOrAlias(Class<T> baseClass, Config config,
String classPropertyName, String defaultClass) {
String className = ConfigUtils.getString(config, classPropertyName, defaultClass);
ClassAliasResolver<T> aliasResolver = new ClassAliasResolver<T>(baseClass);
try {
return (Class<? extends T>) Class.forName(aliasResolver.resolve(className));
} catch (ClassNotFoundException e) {
throw new RuntimeException(
"Cannot resolve the class '" + className + "'. Check that property '" + classPropertyName
+ "' points to a valid class name or alias.", e);
}
}
public static class EmbeddedRestliServerProvider implements Provider<EmbeddedRestliServer> {
Injector injector;
@Inject
public EmbeddedRestliServerProvider(Injector injector) {
this.injector = injector;
}
@Override
public EmbeddedRestliServer get() {
return EmbeddedRestliServer.builder()
.resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class))
.injector(injector)
.build();
}
}
}