blob: 814c750b35c96e62abae443216eefcca33c2fe23 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Frequencies;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.SinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* <p>
* Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
* data to/from Kafka.
* </p>
* <p>
* Since each task has a dedicated thread, this is mainly just a container for them.
* </p>
*/
public class Worker {
private static final Logger log = LoggerFactory.getLogger(Worker.class);
protected Herder herder;
private final ExecutorService executor;
private final Time time;
private final String workerId;
private final Plugins plugins;
private final ConnectMetrics metrics;
private final WorkerMetricsGroup workerMetricsGroup;
private ConnectorStatusMetricsGroup connectorStatusMetricsGroup;
private final WorkerConfig config;
private final Converter internalKeyConverter;
private final Converter internalValueConverter;
private final OffsetBackingStore offsetBackingStore;
private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
private WorkerConfigTransformer workerConfigTransformer;
private ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
public Worker(
String workerId,
Time time,
Plugins plugins,
WorkerConfig config,
OffsetBackingStore offsetBackingStore,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy);
}
@SuppressWarnings("deprecation")
Worker(
String workerId,
Time time,
Plugins plugins,
WorkerConfig config,
OffsetBackingStore offsetBackingStore,
ExecutorService executorService,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy
) {
this.metrics = new ConnectMetrics(workerId, config, time);
this.executor = executorService;
this.workerId = workerId;
this.time = time;
this.plugins = plugins;
this.config = config;
this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
this.workerMetricsGroup = new WorkerMetricsGroup(metrics);
// Internal converters are required properties, thus getClass won't return null.
this.internalKeyConverter = plugins.newConverter(
config,
WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS
);
this.internalValueConverter = plugins.newConverter(
config,
WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS
);
this.offsetBackingStore = offsetBackingStore;
this.offsetBackingStore.configure(config);
this.workerConfigTransformer = initConfigTransformer();
}
private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
);
providerMap.put(providerName, configProvider);
}
return new WorkerConfigTransformer(this, providerMap);
}
public WorkerConfigTransformer configTransformer() {
return workerConfigTransformer;
}
protected Herder herder() {
return herder;
}
/**
* Start worker.
*/
public void start() {
log.info("Worker starting");
offsetBackingStore.start();
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
connectorStatusMetricsGroup = new ConnectorStatusMetricsGroup(metrics, tasks, herder);
log.info("Worker started");
}
/**
* Stop worker.
*/
public void stop() {
log.info("Worker stopping");
long started = time.milliseconds();
long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
if (!connectors.isEmpty()) {
log.warn("Shutting down connectors {} uncleanly; herder should have shut down connectors before the Worker is stopped", connectors.keySet());
stopConnectors();
}
if (!tasks.isEmpty()) {
log.warn("Shutting down tasks {} uncleanly; herder should have shut down tasks before the Worker is stopped", tasks.keySet());
stopAndAwaitTasks();
}
long timeoutMs = limit - time.milliseconds();
sourceTaskOffsetCommitter.close(timeoutMs);
offsetBackingStore.stop();
metrics.stop();
log.info("Worker stopped");
workerMetricsGroup.close();
connectorStatusMetricsGroup.close();
}
/**
* Start a connector managed by this worker.
*
* @param connName the connector name.
* @param connProps the properties of the connector.
* @param ctx the connector runtime context.
* @param statusListener a listener for the runtime status transitions of the connector.
* @param initialState the initial state of the connector.
* @return true if the connector started successfully.
*/
public boolean startConnector(
String connName,
Map<String, String> connProps,
ConnectorContext ctx,
ConnectorStatus.Listener statusListener,
TargetState initialState
) {
try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
if (connectors.containsKey(connName))
throw new ConnectException("Connector with name " + connName + " already exists");
final WorkerConnector workerConnector;
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
log.info("Creating connector {} of type {}", connName, connClass);
final Connector connector = plugins.newConnector(connClass);
workerConnector = new WorkerConnector(connName, connector, ctx, metrics, statusListener);
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
savedLoader = plugins.compareAndSwapLoaders(connector);
workerConnector.initialize(connConfig);
workerConnector.transitionTo(initialState);
Plugins.compareAndSwapLoaders(savedLoader);
} catch (Throwable t) {
log.error("Failed to start connector {}", connName, t);
// Can't be put in a finally block because it needs to be swapped before the call on
// statusListener
Plugins.compareAndSwapLoaders(savedLoader);
workerMetricsGroup.recordConnectorStartupFailure();
statusListener.onFailure(connName, t);
return false;
}
WorkerConnector existing = connectors.putIfAbsent(connName, workerConnector);
if (existing != null)
throw new ConnectException("Connector with name " + connName + " already exists");
log.info("Finished creating connector {}", connName);
workerMetricsGroup.recordConnectorStartupSuccess();
}
return true;
}
/**
* Return true if the connector associated with this worker is a sink connector.
*
* @param connName the connector name.
* @return true if the connector belongs to the worker and is a sink connector.
* @throws ConnectException if the worker does not manage a connector with the given name.
*/
public boolean isSinkConnector(String connName) {
WorkerConnector workerConnector = connectors.get(connName);
if (workerConnector == null)
throw new ConnectException("Connector " + connName + " not found in this worker.");
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector());
return workerConnector.isSinkConnector();
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
}
/**
* Get a list of updated task properties for the tasks of this connector.
*
* @param connName the connector name.
* @return a list of updated tasks properties.
*/
public List<Map<String, String>> connectorTaskConfigs(String connName, ConnectorConfig connConfig) {
List<Map<String, String>> result = new ArrayList<>();
try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
log.trace("Reconfiguring connector tasks for {}", connName);
WorkerConnector workerConnector = connectors.get(connName);
if (workerConnector == null)
throw new ConnectException("Connector " + connName + " not found in this worker.");
int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
Map<String, String> connOriginals = connConfig.originalsStrings();
Connector connector = workerConnector.connector();
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
savedLoader = plugins.compareAndSwapLoaders(connector);
String taskClassName = connector.taskClass().getName();
for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
// Ensure we don't modify the connector's copy of the config
Map<String, String> taskConfig = new HashMap<>(taskProps);
taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG)) {
taskConfig.put(SinkTask.TOPICS_CONFIG, connOriginals.get(SinkTask.TOPICS_CONFIG));
}
if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG)) {
taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG));
}
result.add(taskConfig);
}
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
}
return result;
}
private void stopConnectors() {
// Herder is responsible for stopping connectors. This is an internal method to sequentially
// stop connectors that have not explicitly been stopped.
for (String connector: connectors.keySet())
stopConnector(connector);
}
/**
* Stop a connector managed by this worker.
*
* @param connName the connector name.
* @return true if the connector belonged to this worker and was successfully stopped.
*/
public boolean stopConnector(String connName) {
try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
log.info("Stopping connector {}", connName);
WorkerConnector workerConnector = connectors.remove(connName);
if (workerConnector == null) {
log.warn("Ignoring stop request for unowned connector {}", connName);
return false;
}
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector());
workerConnector.shutdown();
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
log.info("Stopped connector {}", connName);
}
return true;
}
/**
* Get the IDs of the connectors currently running in this worker.
*
* @return the set of connector IDs.
*/
public Set<String> connectorNames() {
return connectors.keySet();
}
/**
* Return true if a connector with the given name is managed by this worker and is currently running.
*
* @param connName the connector name.
* @return true if the connector is running, false if the connector is not running or is not manages by this worker.
*/
public boolean isRunning(String connName) {
WorkerConnector workerConnector = connectors.get(connName);
return workerConnector != null && workerConnector.isRunning();
}
/**
* Start a task managed by this worker.
*
* @param id the task ID.
* @param connProps the connector properties.
* @param taskProps the tasks properties.
* @param statusListener a listener for the runtime status transitions of the task.
* @param initialState the initial state of the connector.
* @return true if the task started successfully.
*/
public boolean startTask(
ConnectorTaskId id,
ClusterConfigState configState,
Map<String, String> connProps,
Map<String, String> taskProps,
TaskStatus.Listener statusListener,
TargetState initialState
) {
final WorkerTask workerTask;
try (LoggingContext loggingContext = LoggingContext.forTask(id)) {
log.info("Creating task {}", id);
if (tasks.containsKey(id))
throw new ConnectException("Task already exists in this worker: " + id);
connectorStatusMetricsGroup.recordTaskAdded(id);
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
final TaskConfig taskConfig = new TaskConfig(taskProps);
final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
final Task task = plugins.newTask(taskClass);
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
// By maintaining connector's specific class loader for this thread here, we first
// search for converters within the connector dependencies.
// If any of these aren't found, that means the connector didn't configure specific converters,
// so we should instantiate based upon the worker configuration
Converter keyConverter = plugins.newConverter(connConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
.CURRENT_CLASSLOADER);
Converter valueConverter = plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);
HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.CURRENT_CLASSLOADER);
if (keyConverter == null) {
keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
} else {
log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
}
if (valueConverter == null) {
valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
} else {
log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
}
if (headerConverter == null) {
headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
.PLUGINS);
log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id);
} else {
log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id);
}
workerTask = buildWorkerTask(configState, connConfig, id, task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, connectorLoader);
workerTask.initialize(taskConfig);
Plugins.compareAndSwapLoaders(savedLoader);
} catch (Throwable t) {
log.error("Failed to start task {}", id, t);
// Can't be put in a finally block because it needs to be swapped before the call on
// statusListener
Plugins.compareAndSwapLoaders(savedLoader);
connectorStatusMetricsGroup.recordTaskRemoved(id);
workerMetricsGroup.recordTaskFailure();
statusListener.onFailure(id, t);
return false;
}
WorkerTask existing = tasks.putIfAbsent(id, workerTask);
if (existing != null)
throw new ConnectException("Task already exists in this worker: " + id);
executor.submit(workerTask);
if (workerTask instanceof WorkerSourceTask) {
sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
}
workerMetricsGroup.recordTaskSuccess();
return true;
}
}
private WorkerTask buildWorkerTask(ClusterConfigState configState,
ConnectorConfig connConfig,
ConnectorTaskId id,
Task task,
TaskStatus.Listener statusListener,
TargetState initialState,
Converter keyConverter,
Converter valueConverter,
HeaderConverter headerConverter,
ClassLoader loader) {
ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
final Class<? extends Connector> connectorClass = plugins.connectorClass(
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM);
retryWithToleranceOperator.metrics(errorHandlingMetrics);
// Decide which type of worker task we need based on the type of task.
if (task instanceof SourceTask) {
retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
Map<String, Object> producerProps = producerConfigs(id, "connector-producer-" + id, config, connConfig, connectorClass,
connectorClientConfigOverridePolicy);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
// Note we pass the configState as it performs dynamic transformations under the covers
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader,
time, retryWithToleranceOperator);
} else if (task instanceof SinkTask) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
valueConverter, headerConverter, transformationChain, consumer, loader, time,
retryWithToleranceOperator);
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
}
}
static Map<String, Object> producerConfigs(ConnectorTaskId id,
String defaultClientId,
WorkerConfig config,
ConnectorConfig connConfig,
Class<? extends Connector> connectorClass,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker,
// but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, defaultClientId);
// User-specified overrides
producerProps.putAll(config.originalsWithPrefix("producer."));
// Connector-specified overrides
Map<String, Object> producerOverrides =
connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX,
ConnectorType.SOURCE, ConnectorClientConfigRequest.ClientType.PRODUCER,
connectorClientConfigOverridePolicy);
producerProps.putAll(producerOverrides);
return producerProps;
}
static Map<String, Object> consumerConfigs(ConnectorTaskId id,
WorkerConfig config,
ConnectorConfig connConfig,
Class<? extends Connector> connectorClass,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector()));
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "connector-consumer-" + id);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.putAll(config.originalsWithPrefix("consumer."));
// Connector-specified overrides
Map<String, Object> consumerOverrides =
connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.CONSUMER,
connectorClientConfigOverridePolicy);
consumerProps.putAll(consumerOverrides);
return consumerProps;
}
static Map<String, Object> adminConfigs(ConnectorTaskId id,
WorkerConfig config,
ConnectorConfig connConfig,
Class<? extends Connector> connectorClass,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
Map<String, Object> adminProps = new HashMap<>();
// Use the top-level worker configs to retain backwards compatibility with older releases which
// did not require a prefix for connector admin client configs in the worker configuration file
// Ignore configs that begin with "admin." since those will be added next (with the prefix stripped)
// and those that begin with "producer." and "consumer.", since we know they aren't intended for
// the admin client
Map<String, Object> nonPrefixedWorkerConfigs = config.originals().entrySet().stream()
.filter(e -> !e.getKey().startsWith("admin.")
&& !e.getKey().startsWith("producer.")
&& !e.getKey().startsWith("consumer."))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
adminProps.putAll(nonPrefixedWorkerConfigs);
// Admin client-specific overrides in the worker config
adminProps.putAll(config.originalsWithPrefix("admin."));
// Connector-specified overrides
Map<String, Object> adminOverrides =
connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX,
ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.ADMIN,
connectorClientConfigOverridePolicy);
adminProps.putAll(adminOverrides);
return adminProps;
}
private static Map<String, Object> connectorClientConfigOverrides(ConnectorTaskId id,
ConnectorConfig connConfig,
Class<? extends Connector> connectorClass,
String clientConfigPrefix,
ConnectorType connectorType,
ConnectorClientConfigRequest.ClientType clientType,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
Map<String, Object> clientOverrides = connConfig.originalsWithPrefix(clientConfigPrefix);
ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
id.connector(),
connectorType,
connectorClass,
clientOverrides,
clientType
);
List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
List<ConfigValue> errorConfigs = configValues.stream().
filter(configValue -> configValue.errorMessages().size() > 0).collect(Collectors.toList());
// These should be caught when the herder validates the connector configuration, but just in case
if (errorConfigs.size() > 0) {
throw new ConnectException("Client Config Overrides not allowed " + errorConfigs);
}
return clientOverrides;
}
ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) {
return new ErrorHandlingMetrics(id, metrics);
}
private List<ErrorReporter> sinkTaskReporters(ConnectorTaskId id, SinkConnectorConfig connConfig,
ErrorHandlingMetrics errorHandlingMetrics,
Class<? extends Connector> connectorClass) {
ArrayList<ErrorReporter> reporters = new ArrayList<>();
LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics);
reporters.add(logReporter);
// check if topic for dead letter queue exists
String topic = connConfig.dlqTopicName();
if (topic != null && !topic.isEmpty()) {
Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
connectorClientConfigOverridePolicy);
Map<String, Object> adminProps = adminConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
reporters.add(reporter);
}
return reporters;
}
private List<ErrorReporter> sourceTaskReporters(ConnectorTaskId id, ConnectorConfig connConfig,
ErrorHandlingMetrics errorHandlingMetrics) {
List<ErrorReporter> reporters = new ArrayList<>();
LogReporter logReporter = new LogReporter(id, connConfig, errorHandlingMetrics);
reporters.add(logReporter);
return reporters;
}
private void stopTask(ConnectorTaskId taskId) {
try (LoggingContext loggingContext = LoggingContext.forTask(taskId)) {
WorkerTask task = tasks.get(taskId);
if (task == null) {
log.warn("Ignoring stop request for unowned task {}", taskId);
return;
}
log.info("Stopping task {}", task.id());
if (task instanceof WorkerSourceTask)
sourceTaskOffsetCommitter.remove(task.id());
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
savedLoader = Plugins.compareAndSwapLoaders(task.loader());
task.stop();
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
}
}
private void stopTasks(Collection<ConnectorTaskId> ids) {
// Herder is responsible for stopping tasks. This is an internal method to sequentially
// stop the tasks that have not explicitly been stopped.
for (ConnectorTaskId taskId : ids) {
stopTask(taskId);
}
}
private void awaitStopTask(ConnectorTaskId taskId, long timeout) {
try (LoggingContext loggingContext = LoggingContext.forTask(taskId)) {
WorkerTask task = tasks.remove(taskId);
connectorStatusMetricsGroup.recordTaskRemoved(taskId);
if (task == null) {
log.warn("Ignoring await stop request for non-present task {}", taskId);
return;
}
if (!task.awaitStop(timeout)) {
log.error("Graceful stop of task {} failed.", task.id());
task.cancel();
} else {
log.debug("Graceful stop of task {} succeeded.", task.id());
}
}
}
private void awaitStopTasks(Collection<ConnectorTaskId> ids) {
long now = time.milliseconds();
long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
for (ConnectorTaskId id : ids) {
long remaining = Math.max(0, deadline - time.milliseconds());
awaitStopTask(id, remaining);
}
}
/**
* Stop asynchronously all the worker's tasks and await their termination.
*/
public void stopAndAwaitTasks() {
stopAndAwaitTasks(new ArrayList<>(tasks.keySet()));
}
/**
* Stop asynchronously a collection of tasks that belong to this worker and await their termination.
*
* @param ids the collection of tasks to be stopped.
*/
public void stopAndAwaitTasks(Collection<ConnectorTaskId> ids) {
stopTasks(ids);
awaitStopTasks(ids);
}
/**
* Stop a task that belongs to this worker and await its termination.
*
* @param taskId the ID of the task to be stopped.
*/
public void stopAndAwaitTask(ConnectorTaskId taskId) {
stopTask(taskId);
awaitStopTasks(Collections.singletonList(taskId));
}
/**
* Get the IDs of the tasks currently running in this worker.
*/
public Set<ConnectorTaskId> taskIds() {
return tasks.keySet();
}
public Converter getInternalKeyConverter() {
return internalKeyConverter;
}
public Converter getInternalValueConverter() {
return internalValueConverter;
}
public Plugins getPlugins() {
return plugins;
}
public String workerId() {
return workerId;
}
/**
* Get the {@link ConnectMetrics} that uses Kafka Metrics and manages the JMX reporter.
* @return the Connect-specific metrics; never null
*/
public ConnectMetrics metrics() {
return metrics;
}
public void setTargetState(String connName, TargetState state) {
log.info("Setting connector {} state to {}", connName, state);
WorkerConnector workerConnector = connectors.get(connName);
if (workerConnector != null) {
ClassLoader connectorLoader =
plugins.delegatingLoader().connectorLoader(workerConnector.connector());
transitionTo(workerConnector, state, connectorLoader);
}
for (Map.Entry<ConnectorTaskId, WorkerTask> taskEntry : tasks.entrySet()) {
if (taskEntry.getKey().connector().equals(connName)) {
WorkerTask workerTask = taskEntry.getValue();
transitionTo(workerTask, state, workerTask.loader());
}
}
}
private void transitionTo(Object connectorOrTask, TargetState state, ClassLoader loader) {
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
savedLoader = Plugins.compareAndSwapLoaders(loader);
if (connectorOrTask instanceof WorkerConnector) {
((WorkerConnector) connectorOrTask).transitionTo(state);
} else if (connectorOrTask instanceof WorkerTask) {
((WorkerTask) connectorOrTask).transitionTo(state);
} else {
throw new ConnectException(
"Request for state transition on an object that is neither a "
+ "WorkerConnector nor a WorkerTask: "
+ connectorOrTask.getClass());
}
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
}
ConnectorStatusMetricsGroup connectorStatusMetricsGroup() {
return connectorStatusMetricsGroup;
}
WorkerMetricsGroup workerMetricsGroup() {
return workerMetricsGroup;
}
static class ConnectorStatusMetricsGroup {
private ConnectMetrics connectMetrics;
private ConnectMetricsRegistry registry;
private ConcurrentMap<String, MetricGroup> connectorStatusMetrics = new ConcurrentHashMap<>();
private Herder herder;
private ConcurrentMap<ConnectorTaskId, WorkerTask> tasks;
protected ConnectorStatusMetricsGroup(
ConnectMetrics connectMetrics, ConcurrentMap<ConnectorTaskId, WorkerTask> tasks, Herder herder) {
this.connectMetrics = connectMetrics;
this.registry = connectMetrics.registry();
this.tasks = tasks;
this.herder = herder;
}
protected ConnectMetrics.LiteralSupplier<Long> taskCounter(String connName) {
return now -> tasks.keySet()
.stream()
.filter(taskId -> taskId.connector().equals(connName))
.count();
}
protected ConnectMetrics.LiteralSupplier<Long> taskStatusCounter(String connName, TaskStatus.State state) {
return now -> tasks.values()
.stream()
.filter(task ->
task.id().connector().equals(connName) &&
herder.taskStatus(task.id()).state().equalsIgnoreCase(state.toString()))
.count();
}
protected synchronized void recordTaskAdded(ConnectorTaskId connectorTaskId) {
if (connectorStatusMetrics.containsKey(connectorTaskId.connector())) {
return;
}
String connName = connectorTaskId.connector();
MetricGroup metricGroup = connectMetrics.group(registry.workerGroupName(),
registry.connectorTagName(), connName);
metricGroup.addValueMetric(registry.connectorTotalTaskCount, taskCounter(connName));
for (Map.Entry<MetricNameTemplate, TaskStatus.State> statusMetric : registry.connectorStatusMetrics
.entrySet()) {
metricGroup.addValueMetric(statusMetric.getKey(), taskStatusCounter(connName,
statusMetric.getValue()));
}
connectorStatusMetrics.put(connectorTaskId.connector(), metricGroup);
}
protected synchronized void recordTaskRemoved(ConnectorTaskId connectorTaskId) {
// Unregister connector task count metric if we remove the last task of the connector
if (tasks.keySet().stream().noneMatch(id -> id.connector().equals(connectorTaskId.connector()))) {
connectorStatusMetrics.get(connectorTaskId.connector()).close();
connectorStatusMetrics.remove(connectorTaskId.connector());
}
}
protected synchronized void close() {
for (MetricGroup metricGroup: connectorStatusMetrics.values()) {
metricGroup.close();
}
}
protected MetricGroup metricGroup(String connectorId) {
return connectorStatusMetrics.get(connectorId);
}
}
class WorkerMetricsGroup {
private final MetricGroup metricGroup;
private final Sensor connectorStartupAttempts;
private final Sensor connectorStartupSuccesses;
private final Sensor connectorStartupFailures;
private final Sensor connectorStartupResults;
private final Sensor taskStartupAttempts;
private final Sensor taskStartupSuccesses;
private final Sensor taskStartupFailures;
private final Sensor taskStartupResults;
public WorkerMetricsGroup(ConnectMetrics connectMetrics) {
ConnectMetricsRegistry registry = connectMetrics.registry();
metricGroup = connectMetrics.group(registry.workerGroupName());
metricGroup.addValueMetric(registry.connectorCount, now -> (double) connectors.size());
metricGroup.addValueMetric(registry.taskCount, now -> (double) tasks.size());
MetricName connectorFailurePct = metricGroup.metricName(registry.connectorStartupFailurePercentage);
MetricName connectorSuccessPct = metricGroup.metricName(registry.connectorStartupSuccessPercentage);
Frequencies connectorStartupResultFrequencies = Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
connectorStartupResults = metricGroup.sensor("connector-startup-results");
connectorStartupResults.add(connectorStartupResultFrequencies);
connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
Frequencies taskStartupResultFrequencies = Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
taskStartupResults = metricGroup.sensor("task-startup-results");
taskStartupResults.add(taskStartupResultFrequencies);
taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
taskStartupFailures = metricGroup.sensor("task-startup-failures");
taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
}
void close() {
metricGroup.close();
}
void recordConnectorStartupFailure() {
connectorStartupAttempts.record(1.0);
connectorStartupFailures.record(1.0);
connectorStartupResults.record(0.0);
}
void recordConnectorStartupSuccess() {
connectorStartupAttempts.record(1.0);
connectorStartupSuccesses.record(1.0);
connectorStartupResults.record(1.0);
}
void recordTaskFailure() {
taskStartupAttempts.record(1.0);
taskStartupFailures.record(1.0);
taskStartupResults.record(0.0);
}
void recordTaskSuccess() {
taskStartupAttempts.record(1.0);
taskStartupSuccesses.record(1.0);
taskStartupResults.record(1.0);
}
protected MetricGroup metricGroup() {
return metricGroup;
}
}
}