blob: c24dd6c7907a0515541b5096874d5abdf5c2ef45 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.crypto.spec.SecretKeySpec;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import static org.apache.kafka.connect.runtime.TargetState.PAUSED;
import static org.apache.kafka.connect.runtime.TargetState.STOPPED;
import static org.apache.kafka.connect.util.ConnectUtils.className;
* <p>
* Provides persistent storage of Kafka Connect connector configurations in a Kafka topic.
* </p>
* <p>
* This class manages both connector and task configurations, among other various configurations. It tracks seven types
* of records:
* <ol>
* <li> Connector config: map of string -> string configurations passed to the Connector class, with support for
* expanding this format if necessary. (Kafka key: connector-[connector-id]).
* These configs are *not* ephemeral. They represent the source of truth. If the entire Connect
* cluster goes down, this is all that is really needed to recover.
* <li> Task configs: map of string -> string configurations passed to the Task class, with support for expanding
* this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
* These configs are ephemeral; they are stored here to:
* <ul>
* <li> disseminate them to all workers while ensuring agreement
* <li> to allow faster cluster/worker recovery since the common case of recovery (restoring a connector) will simply
* result in the same task configuration as before the failure.
* </ul>
* <li> Task commit "configs": records indicating that previous task config entries should be committed and all task
* configs for a connector can be applied. (Kafka key: commit-[connector-id].
* This config has two effects. First, it records the number of tasks the connector is currently
* running (and can therefore increase/decrease parallelism). Second, because each task config
* is stored separately but they need to be applied together to ensure each partition is assigned
* to a single task, this record also indicates that task configs for the specified connector
* can be "applied" or "committed".
* <li> Connector target states: records indicating the {@link TargetState} for a connector
* <li> {@link RestartRequest Restart requests}: records representing requests to restart a connector and / or its
* tasks. See <a href="">KIP-745</a> for more
* details.
* <li> Task count records: an integer value that that tracks the number of task producers (for source connectors) that
* will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task
* configurations. This is required for exactly-once support for source connectors, see
* <a href="">KIP-618</a>
* for more details.
* <li> Session key records: A {@link SessionKey} generated by the leader of the cluster when
* {@link ConnectProtocolCompatibility#SESSIONED} is the Connect protocol being used by the cluster. This session key
* is used to verify internal REST requests between workers in the cluster. See
* <a href="">KIP-507</a>
* for more details.
* </ol>
* <p/>
* <p>
* This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
* ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
* us to clean up outdated configurations over time. However, this combination has some important implications for
* the implementation of this class and the configuration state that it may expose.
* </p>
* <p>
* Connector configurations are independent of all other configs, so they are handled easily. Writing a single record
* is already atomic, so these can be applied as soon as they are read. One connectors config does not affect any
* others, and they do not need to coordinate with the connector's task configuration at all.
* </p>
* <p>
* The most obvious implication for task configs is the need for the commit messages. This class doesn't currently make
* use of Kafka's multi-record transactions and instead uses task commit messages to ensure that readers do not
* end up using inconsistent configs. For example, consider if a connector wrote configs for its tasks,
* then was reconfigured and only managed to write updated configs for half its tasks. If task configs
* were applied immediately you could be using half the old configs and half the new configs. In that condition, some
* partitions may be double-assigned because the old config and new config may use completely different assignments.
* Therefore, when reading the log, we must buffer config updates for a connector's tasks and only apply atomically them
* once a commit message has been read.
* </p>
* <p>
* However, there are also further challenges. This simple buffering approach would work fine as long as the entire log was
* always available, but we would like to be able to enable compaction so our configuration topic does not grow
* indefinitely. Compaction may break a normal log because old entries will suddenly go missing. A new worker reading
* from the beginning of the log in order to build up the full current configuration will see task commits, but some
* records required for those commits will have been removed because the same keys have subsequently been rewritten.
* For example, if you have a sequence of record keys [connector-foo-config, task-foo-1-config, task-foo-2-config,
* commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)], we can end up with a compacted log containing
* [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]. When read
* back, the first commit will see an invalid state because the first task-foo-1-config has been cleaned up.
* </p>
* <p>
* Compaction can further complicate things if writing new task configs fails mid-write. Consider a similar scenario
* as the previous one, but in this case both the first and second update will write 2 task configs. However, the
* second write fails half of the way through:
* [connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. Now compaction
* occurs and we're left with
* [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. At the first commit, we don't
* have a complete set of configs. And because of the failure, there is no second commit. We are left in an inconsistent
* state with no obvious way to resolve the issue -- we can try to keep on reading, but the failed node may never
* recover and write the updated config. Meanwhile, other workers may have seen the entire log; they will see the second
* task-foo-1-config waiting to be applied, but will otherwise think everything is ok -- they have a valid set of task
* configs for connector "foo".
* </p>
* <p>
* Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
* (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
* configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
* This allows users of this class (i.e., {@link Herder} implementations) to take action to resolve any inconsistencies. These
* inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
* of updating task configurations).
* </p>
* <p>
* Note that the expectation is that this config storage system has only a single writer at a time.
* The caller (Herder) must ensure this is the case. In distributed mode this will require forwarding config change
* requests to the leader in the cluster (i.e. the worker group coordinated by the Kafka broker).
* </p>
* <p>
* Since processing of the config log occurs in a background thread, callers must take care when using accessors.
* To simplify handling this correctly, this class only exposes a mechanism to snapshot the current state of the cluster.
* Updates may continue to be applied (and callbacks invoked) in the background. Callers must take care that they are
* using a consistent snapshot and only update when it is safe. In particular, if task configs are updated which require
* synchronization across workers to commit offsets and update the configuration, callbacks and updates during the
* rebalance must be deferred.
* </p>
public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore implements ConfigBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaConfigBackingStore.class);
public static final String TARGET_STATE_PREFIX = "target-state-";
public static String TARGET_STATE_KEY(String connectorName) {
return TARGET_STATE_PREFIX + connectorName;
public static final String CONNECTOR_PREFIX = "connector-";
public static String CONNECTOR_KEY(String connectorName) {
return CONNECTOR_PREFIX + connectorName;
public static final String TASK_PREFIX = "task-";
public static String TASK_KEY(ConnectorTaskId taskId) {
return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
public static final String COMMIT_TASKS_PREFIX = "commit-";
public static String COMMIT_TASKS_KEY(String connectorName) {
return COMMIT_TASKS_PREFIX + connectorName;
public static final String TASK_COUNT_RECORD_PREFIX = "tasks-fencing-";
public static String TASK_COUNT_RECORD_KEY(String connectorName) {
return TASK_COUNT_RECORD_PREFIX + connectorName;
public static final String SESSION_KEY_KEY = "session-key";
// Note that while using real serialization for values as we have here, but ad hoc string serialization for keys,
// isn't ideal, we use this approach because it avoids any potential problems with schema evolution or
// converter/serializer changes causing keys to change. We need to absolutely ensure that the keys remain precisely
// the same.
public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct()
.field("properties",, Schema.OPTIONAL_STRING_SCHEMA).build())
public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct()
.field("tasks", Schema.INT32_SCHEMA)
public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
.field("state", Schema.STRING_SCHEMA)
public static final Schema TARGET_STATE_V1 = SchemaBuilder.struct()
.field("state", Schema.STRING_SCHEMA)
.field("state.v2", Schema.OPTIONAL_STRING_SCHEMA)
public static final Schema TASK_COUNT_RECORD_V0 = SchemaBuilder.struct()
.field("task-count", Schema.INT32_SCHEMA)
// The key is logically a byte array, but we can't use the JSON converter to (de-)serialize that without a schema.
// So instead, we base 64-encode it before serializing and decode it after deserializing.
public static final Schema SESSION_KEY_V0 = SchemaBuilder.struct()
.field("key", Schema.STRING_SCHEMA)
.field("algorithm", Schema.STRING_SCHEMA)
.field("creation-timestamp", Schema.INT64_SCHEMA)
public static final String RESTART_PREFIX = "restart-connector-";
public static String RESTART_KEY(String connectorName) {
return RESTART_PREFIX + connectorName;
public static final boolean ONLY_FAILED_DEFAULT = false;
public static final boolean INCLUDE_TASKS_DEFAULT = false;
public static final String ONLY_FAILED_FIELD_NAME = "only-failed";
public static final String INCLUDE_TASKS_FIELD_NAME = "include-tasks";
public static final Schema RESTART_REQUEST_V0 = SchemaBuilder.struct()
public static final String LOGGER_CLUSTER_PREFIX = "logger-cluster-";
public static String LOGGER_CLUSTER_KEY(String namespace) {
return LOGGER_CLUSTER_PREFIX + namespace;
public static final Schema LOGGER_LEVEL_V0 = SchemaBuilder.struct()
.field("level", Schema.STRING_SCHEMA)
// Visible for testing
static final long READ_WRITE_TOTAL_TIMEOUT_MS = 30000;
private final Object lock;
private final Converter converter;
private volatile boolean started;
// Although updateListener is not final, it's guaranteed to be visible to any thread after its
// initialization as long as we always read the volatile variable "started" before we access the listener.
private ConfigBackingStore.UpdateListener updateListener;
private final Map<String, Object> baseProducerProps;
private final String topic;
// Data is passed to the log already serialized. We use a converter to handle translating to/from generic Connect
// format to serialized form
private KafkaBasedLog<String, byte[]> configLog;
// Connector -> # of tasks
final Map<String, Integer> connectorTaskCounts = new HashMap<>();
// Connector and task configs: name or id -> config map
final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;
private final String clientId;
// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
final Set<String> inconsistent = new HashSet<>();
// The most recently read offset. This does not take into account deferred task updates/commits, so we may have
// outstanding data to be applied.
private volatile long offset;
// The most recently read session key, to use for validating internal REST requests.
private volatile SessionKey sessionKey;
// Connector -> Map[ConnectorTaskId -> Configs]
// visible for testing
final Map<String, Map<ConnectorTaskId, Map<String, String>>> deferredTaskUpdates = new HashMap<>();
final Map<String, TargetState> connectorTargetStates = new HashMap<>();
final Map<String, Integer> connectorTaskCountRecords = new HashMap<>();
final Map<String, Integer> connectorTaskConfigGenerations = new HashMap<>();
final Set<String> connectorsPendingFencing = new HashSet<>();
private final WorkerConfigTransformer configTransformer;
private final boolean usesFencableWriter;
private volatile Producer<String, byte[]> fencableProducer;
private final Map<String, Object> fencableProducerProps;
private final Time time;
void setConfigLog(KafkaBasedLog<String, byte[]> configLog) {
this.configLog = configLog;
public KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase) {
this(converter, config, configTransformer, adminSupplier, clientIdBase, Time.SYSTEM);
KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase, Time time) {
this.lock = new Object();
this.started = false;
this.converter = converter;
this.offset = -1;
this.topicAdminSupplier = adminSupplier;
this.clientId = Objects.requireNonNull(clientIdBase) + "configs";
this.time = time;
this.baseProducerProps = baseProducerProps(config);
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when
// gets approved and scheduled for release.
baseProducerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
this.fencableProducerProps = fencableProducerProps(config);
this.usesFencableWriter = config.transactionalLeaderEnabled();
this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
if (this.topic == null || this.topic.trim().isEmpty())
throw new ConfigException("Must specify topic for connector configuration.");
configLog = setupAndCreateKafkaBasedLog(this.topic, config);
this.configTransformer = configTransformer;
public void setUpdateListener(ConfigBackingStore.UpdateListener listener) {
this.updateListener = listener;
public void start() {"Starting KafkaConfigBackingStore");
// Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
// updates can continue to occur in the background
try {
} catch (UnsupportedVersionException e) {
throw new ConnectException(
"Enabling exactly-once support for source connectors requires a Kafka broker version that allows "
+ "admin clients to read consumer offsets. Please either disable the worker's exactly-once "
+ "support for source connectors, or use a newer Kafka broker version.",
int partitionCount = configLog.partitionCount();
if (partitionCount > 1) {
String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+ "to have a single partition in order to guarantee consistency of "
+ "connector configurations, but found %d partitions.",
topic, DistributedConfig.CONFIG_TOPIC_CONFIG, partitionCount);
throw new ConfigException(msg);
started = true;"Started KafkaConfigBackingStore");
public void stop() {"Closing KafkaConfigBackingStore");
Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic");"Closed KafkaConfigBackingStore");
public void claimWritePrivileges() {
if (usesFencableWriter && fencableProducer == null) {
try {
fencableProducer = createFencableProducer();
} catch (Exception e) {
throw new ConnectException("Failed to create and initialize fencable producer for config topic", e);
private Map<String, Object> baseProducerProps(WorkerConfig workerConfig) {
Map<String, Object> producerProps = new HashMap<>(workerConfig.originals());
String kafkaClusterId = workerConfig.kafkaClusterId();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
ConnectUtils.addMetricsContextProperties(producerProps, workerConfig, kafkaClusterId);
return producerProps;
// Visible for testing
Map<String, Object> fencableProducerProps(DistributedConfig workerConfig) {
Map<String, Object> result = new HashMap<>(baseProducerProps(workerConfig));
result.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-leader");
// Always require producer acks to all to ensure durable writes
result.put(ProducerConfig.ACKS_CONFIG, "all");
// We can set this to 5 instead of 1 without risking reordering because we are using an idempotent producer
result.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
"for the worker's config topic producer when exactly-once source support is enabled or in preparation to be enabled",
result, ProducerConfig.TRANSACTIONAL_ID_CONFIG, workerConfig.transactionalProducerId(),
"for the worker's config topic producer when exactly-once source support is enabled or in preparation to be enabled",
return result;
// Visible in order to be mocked during testing
Producer<String, byte[]> createFencableProducer() {
return new KafkaProducer<>(fencableProducerProps);
* Get a snapshot of the current state of the cluster.
public ClusterConfigState snapshot() {
synchronized (lock) {
// Only a shallow copy is performed here; in order to avoid accidentally corrupting the worker's view
// of the config topic, any nested structures should be copied before making modifications
return new ClusterConfigState(
new HashMap<>(connectorTaskCounts),
new HashMap<>(connectorConfigs),
new HashMap<>(connectorTargetStates),
new HashMap<>(taskConfigs),
new HashMap<>(connectorTaskCountRecords),
new HashMap<>(connectorTaskConfigGenerations),
new HashSet<>(connectorsPendingFencing),
new HashSet<>(inconsistent),
public boolean contains(String connector) {
synchronized (lock) {
return connectorConfigs.containsKey(connector);
* Write this connector configuration (and optionally a target state) to persistent storage and wait until it has been acknowledged and read
* back by tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} must be successfully invoked before calling
* this method if the worker is configured to use a fencable producer for writes to the config topic.
* @param connector name of the connector to write data for
* @param properties the configuration to write
* @param targetState the desired target state for the connector; may be {@code null} if no target state change is desired. Note that the default
* target state is {@link TargetState#STARTED} if no target state exists previously
* @throws IllegalStateException if {@link #claimWritePrivileges()} is required, but was not successfully invoked before
* this method was called
* @throws PrivilegedWriteException if the worker is configured to use a fencable producer for writes to the config topic
* and the write fails
public void putConnectorConfig(String connector, Map<String, String> properties, TargetState targetState) {
log.debug("Writing connector configuration for connector '{}'", connector);
Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
connectConfig.put("properties", properties);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
List<ProducerKeyValue> keyValues = new ArrayList<>();
if (targetState != null) {
log.debug("Writing target state {} for connector {}", targetState, connector);
keyValues.add(new ProducerKeyValue(TARGET_STATE_KEY(connector), serializeTargetState(targetState)));
keyValues.add(new ProducerKeyValue(CONNECTOR_KEY(connector), serializedConfig));
sendPrivileged(keyValues, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write connector configuration to Kafka: ", e);
throw new ConnectException("Error writing connector configuration to Kafka", e);
* Remove configuration for a given connector. {@link #claimWritePrivileges()} must be successfully invoked before calling
* this method if the worker is configured to use a fencable producer for writes to the config topic.
* @param connector name of the connector to remove
* @throws IllegalStateException if {@link #claimWritePrivileges()} is required, but was not successfully invoked before
* this method was called
* @throws PrivilegedWriteException if the worker is configured to use a fencable producer for writes to the config topic
* and the write fails
public void removeConnectorConfig(String connector) {
log.debug("Removing connector configuration for connector '{}'", connector);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
List<ProducerKeyValue> keyValues = Arrays.asList(
new ProducerKeyValue(CONNECTOR_KEY(connector), null),
new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
sendPrivileged(keyValues, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to remove connector configuration from Kafka: ", e);
throw new ConnectException("Error removing connector configuration from Kafka", e);
public void removeTaskConfigs(String connector) {
throw new UnsupportedOperationException("Removal of tasks is not currently supported");
* Write these task configurations and associated commit messages, unless an inconsistency is found that indicates
* that we would be leaving one of the referenced connectors with an inconsistent state. {@link #claimWritePrivileges()}
* must be successfully invoked before calling this method if the worker is configured to use a fencable producer for
* writes to the config topic.
* @param connector the connector to write task configuration
* @param configs list of task configurations for the connector
* @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root
* and task configurations.
* @throws IllegalStateException if {@link #claimWritePrivileges()} is required, but was not successfully invoked before
* this method was called
* @throws PrivilegedWriteException if the worker is configured to use a fencable producer for writes to the config topic
* and the write fails
public void putTaskConfigs(String connector, List<Map<String, String>> configs) {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
// Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
// any outstanding lagging data to consume.
try {
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write root configuration to Kafka: ", e);
throw new ConnectException("Error writing root configuration to Kafka", e);
int taskCount = configs.size();
// Send all the individual updates
int index = 0;
List<ProducerKeyValue> keyValues = new ArrayList<>();
for (Map<String, String> taskConfig: configs) {
Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
connectConfig.put("properties", taskConfig);
byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
log.debug("Writing configuration for connector '{}' task {}", connector, index);
ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
keyValues.add(new ProducerKeyValue(TASK_KEY(connectorTaskId), serializedConfig));
try {
sendPrivileged(keyValues, timer);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Failed to write task configurations to Kafka", e);
throw new ConnectException("Error writing task configurations to Kafka", e);
// Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
// the end of the log
try {
// Read to end to ensure all the task configs have been written
if (taskCount > 0) {
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
// Write the commit message
Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
connectConfig.put("tasks", taskCount);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
log.debug("Writing commit for connector '{}' with {} tasks.", connector, taskCount);
sendPrivileged(COMMIT_TASKS_KEY(connector), serializedConfig, timer);
// Read to end to ensure all the commit messages have been written
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write root configuration to Kafka: ", e);
throw new ConnectException("Error writing root configuration to Kafka", e);
public void refresh(long timeout, TimeUnit unit) throws TimeoutException {
try {
configLog.readToEnd().get(timeout, unit);
} catch (InterruptedException | ExecutionException e) {
throw new ConnectException("Error trying to read to end of config log", e);
* Write a new {@link TargetState} for the connector. Note that {@link #claimWritePrivileges()} does not need to be
* invoked before invoking this method.
* @param connector the name of the connector
* @param state the desired target state for the connector
public void putTargetState(String connector, TargetState state) {
log.debug("Writing target state {} for connector {}", state, connector);
try {
configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializeTargetState(state))
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write target state to Kafka", e);
throw new ConnectException("Error writing target state to Kafka", e);
private byte[] serializeTargetState(TargetState state) {
Struct connectTargetState = new Struct(TARGET_STATE_V1);
// Older workers don't support the STOPPED state; fall back on PAUSED
connectTargetState.put("state", state == STOPPED ? :;
return converter.fromConnectData(topic, TARGET_STATE_V1, connectTargetState);
* Write a task count record for a connector to persistent storage and wait until it has been acknowledged and read back by
* tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} must be successfully invoked before calling this method
* if the worker is configured to use a fencable producer for writes to the config topic.
* @param connector name of the connector
* @param taskCount number of tasks used by the connector
* @throws IllegalStateException if {@link #claimWritePrivileges()} is required, but was not successfully invoked before
* this method was called
* @throws PrivilegedWriteException if the worker is configured to use a fencable producer for writes to the config topic
* and the write fails
public void putTaskCountRecord(String connector, int taskCount) {
Struct taskCountRecord = new Struct(TASK_COUNT_RECORD_V0);
taskCountRecord.put("task-count", taskCount);
byte[] serializedTaskCountRecord = converter.fromConnectData(topic, TASK_COUNT_RECORD_V0, taskCountRecord);
log.debug("Writing task count record {} for connector {}", taskCount, connector);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
sendPrivileged(TASK_COUNT_RECORD_KEY(connector), serializedTaskCountRecord, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write task count record with {} tasks for connector {} to Kafka: ", taskCount, connector, e);
throw new ConnectException("Error writing task count record to Kafka", e);
* Write a session key to persistent storage and wait until it has been acknowledged and read back by tailing the Kafka log
* with a consumer. {@link #claimWritePrivileges()} must be successfully invoked before calling this method if the worker
* is configured to use a fencable producer for writes to the config topic.
* @param sessionKey the session key to distributed
* @throws IllegalStateException if {@link #claimWritePrivileges()} is required, but was not successfully invoked before
* this method was called
* @throws PrivilegedWriteException if the worker is configured to use a fencable producer for writes to the config topic
* and the write fails
public void putSessionKey(SessionKey sessionKey) {
log.debug("Distributing new session key");
Struct sessionKeyStruct = new Struct(SESSION_KEY_V0);
sessionKeyStruct.put("key", Base64.getEncoder().encodeToString(sessionKey.key().getEncoded()));
sessionKeyStruct.put("algorithm", sessionKey.key().getAlgorithm());
sessionKeyStruct.put("creation-timestamp", sessionKey.creationTimestamp());
byte[] serializedSessionKey = converter.fromConnectData(topic, SESSION_KEY_V0, sessionKeyStruct);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
sendPrivileged(SESSION_KEY_KEY, serializedSessionKey, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write session key to Kafka: ", e);
throw new ConnectException("Error writing session key to Kafka", e);
* Write a restart request for the connector and optionally its tasks to persistent storage and wait until it has been
* acknowledged and read back by tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} must be successfully
* invoked before calling this method if the worker is configured to use a fencable producer for writes to the config topic.
* @param restartRequest the restart request details
public void putRestartRequest(RestartRequest restartRequest) {
log.debug("Writing {} to Kafka", restartRequest);
String key = RESTART_KEY(restartRequest.connectorName());
Struct value = new Struct(RESTART_REQUEST_V0);
value.put(INCLUDE_TASKS_FIELD_NAME, restartRequest.includeTasks());
value.put(ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
byte[] serializedValue = converter.fromConnectData(topic, value.schema(), value);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
sendPrivileged(key, serializedValue, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write {} to Kafka: ", restartRequest, e);
throw new ConnectException("Error writing " + restartRequest + " to Kafka", e);
public void putLoggerLevel(String namespace, String level) {
log.debug("Writing level {} for logging namespace {} to Kafka", level, namespace);
Struct value = new Struct(LOGGER_LEVEL_V0);
value.put("level", level);
byte[] serializedValue = converter.fromConnectData(topic, value.schema(), value);
try {
configLog.sendWithReceipt(LOGGER_CLUSTER_KEY(namespace), serializedValue).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write logger level to Kafka", e);
throw new ConnectException("Error writing logger level to Kafka", e);
// package private for testing
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(baseProducerProps);
producerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
if (config.exactlyOnceSourceEnabled()) {
consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString(),
"for the worker's config topic consumer when exactly-once source support is enabled",
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).configStorageTopicSettings()
: Collections.emptyMap();
NewTopic topicDescription = TopicAdmin.defineTopic(topic)
.config(topicSettings) // first so that we override user-supplied settings as needed
return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, topicAdminSupplier, config, time);
* Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
* successfully invoked before calling this method if this store is configured to use a fencable writer.
* @param key the record key
* @param value the record value
* @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
private void sendPrivileged(String key, byte[] value, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)), timer);
* Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be
* successfully invoked before calling this method if this store is configured to use a fencable writer.
* @param keyValues the list of producer record key/value pairs
* @param timer Timer bounding how long this method can block. The timer is updated before the method returns.
private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) throws ExecutionException, InterruptedException, TimeoutException {
if (!usesFencableWriter) {
List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
keyValue -> producerFutures.add(configLog.sendWithReceipt(keyValue.key, keyValue.value))
for (Future<RecordMetadata> future : producerFutures) {
future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
if (fencableProducer == null) {
throw new IllegalStateException("Cannot produce to config topic without claiming write privileges first");
try {
keyValue -> fencableProducer.send(new ProducerRecord<>(topic, keyValue.key, keyValue.value))
} catch (Exception e) {
log.warn("Failed to perform fencable send to config topic", e);
throw new PrivilegedWriteException("Failed to perform fencable send to config topic", e);
private static class ProducerKeyValue {
final String key;
final byte[] value;
ProducerKeyValue(String key, byte[] value) {
this.key = key;
this.value = value;
private void relinquishWritePrivileges() {
if (fencableProducer != null) {
Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic");
fencableProducer = null;
protected String getTopicConfig() {
return DistributedConfig.CONFIG_TOPIC_CONFIG;
protected String getTopicPurpose() {
return "connector configurations";
private class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
if (error != null) {
log.error("Unexpected in consumer callback for KafkaConfigBackingStore: ", error);
final SchemaAndValue value;
try {
value = converter.toConnectData(topic, record.value());
} catch (DataException e) {
log.error("Failed to convert config data to Kafka Connect format: ", e);
// Make the recorded offset match the API used for positions in the consumer -- return the offset of the
// *next record*, not the last one consumed.
offset = record.offset() + 1;
if (record.key().startsWith(TARGET_STATE_PREFIX)) {
String connectorName = record.key().substring(TARGET_STATE_PREFIX.length());
processTargetStateRecord(connectorName, value);
} else if (record.key().startsWith(CONNECTOR_PREFIX)) {
String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
processConnectorConfigRecord(connectorName, value);
} else if (record.key().startsWith(TASK_PREFIX)) {
ConnectorTaskId taskId = parseTaskId(record.key());
if (taskId == null) {
log.error("Ignoring task configuration because {} couldn't be parsed as a task config key", record.key());
processTaskConfigRecord(taskId, value);
} else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length());
processTasksCommitRecord(connectorName, value);
} else if (record.key().startsWith(RESTART_PREFIX)) {
RestartRequest request = recordToRestartRequest(record, value);
// Only notify the listener if this backing store is already successfully started (having caught up the first time)
if (request != null && started) {
} else if (record.key().startsWith(TASK_COUNT_RECORD_PREFIX)) {
String connectorName = record.key().substring(TASK_COUNT_RECORD_PREFIX.length());
processTaskCountRecord(connectorName, value);
} else if (record.key().equals(SESSION_KEY_KEY)) {
} else if (record.key().startsWith(LOGGER_CLUSTER_PREFIX)) {
String loggingNamespace = record.key().substring(LOGGER_CLUSTER_PREFIX.length());
processLoggerLevelRecord(loggingNamespace, value);
} else {
log.error("Discarding config update record with invalid key: {}", record.key());
private void processTargetStateRecord(String connectorName, SchemaAndValue value) {
boolean removed = false;
boolean stateChanged = true;
synchronized (lock) {
if (value.value() == null) {
// When connector configs are removed, we also write tombstones for the target state.
log.debug("Removed target state for connector {} due to null value in topic.", connectorName);
removed = true;
// If for some reason we still have configs for the connector, add back the default
// STARTED state to ensure each connector always has a valid target state.
if (connectorConfigs.containsKey(connectorName))
connectorTargetStates.put(connectorName, TargetState.STARTED);
} else {
if (!(value.value() instanceof Map)) {
log.error("Ignoring target state for connector '{}' because it is in the wrong format: {}", connectorName, className(value.value()));
Map<String, Object> valueMap = (Map<String, Object>) value.value();
Object targetState = valueMap.get("state.v2");
if (targetState != null && !(targetState instanceof String)) {
log.error("Invalid data for target state for connector '{}': 'state.v2' field should be a String but is {}",
connectorName, className(targetState));
// We don't return here; it's still possible that there's a value we can use in the older state field
targetState = null;
if (targetState == null) {
// This record may have been written by an older worker; fall back on the older state field
targetState = valueMap.get("state");
if (!(targetState instanceof String)) {
log.error("Invalid data for target state for connector '{}': 'state' field should be a String but is {}",
connectorName, className(targetState));
try {
TargetState state = TargetState.valueOf((String) targetState);
log.debug("Setting target state for connector '{}' to {}", connectorName, targetState);
TargetState prevState = connectorTargetStates.put(connectorName, state);
stateChanged = !state.equals(prevState);
} catch (IllegalArgumentException e) {
log.error("Invalid target state for connector '{}': {}", connectorName, targetState);
// Note that we do not notify the update listener if the target state has been removed.
// Instead we depend on the removal callback of the connector config itself to notify the worker.
// We also don't notify the update listener if the connector doesn't exist yet - a scenario that can
// occur if an explicit initial target state is specified in the connector creation request.
if (started && !removed && stateChanged && connectorConfigs.containsKey(connectorName))
private void processConnectorConfigRecord(String connectorName, SchemaAndValue value) {
boolean removed = false;
synchronized (lock) {
if (value.value() == null) {
// Connector deletion will be written as a null value"Successfully processed removal of connector '{}'", connectorName);
taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
removed = true;
} else {
// Connector configs can be applied and callbacks invoked immediately
if (!(value.value() instanceof Map)) {
log.error("Ignoring configuration for connector '{}' because it is in the wrong format: {}", connectorName, className(value.value()));
Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
if (!(newConnectorConfig instanceof Map)) {
log.error("Invalid data for config for connector '{}': 'properties' field should be a Map but is {}",
connectorName, className(newConnectorConfig));
log.debug("Updating configuration for connector '{}'", connectorName);
Map<String, String> stringsConnectorConfig = (Map<String, String>) newConnectorConfig;
connectorConfigs.put(connectorName, stringsConnectorConfig);
// Set the initial state of the connector to STARTED, which ensures that any connectors
// which were created with 0.9 Connect will be initialized in the STARTED state.
if (!connectorTargetStates.containsKey(connectorName))
connectorTargetStates.put(connectorName, TargetState.STARTED);
if (started) {
if (removed)
private void processTaskConfigRecord(ConnectorTaskId taskId, SchemaAndValue value) {
synchronized (lock) {
if (value.value() == null) {
log.error("Ignoring task configuration for task {} because it is unexpectedly null", taskId);
if (!(value.value() instanceof Map)) {
log.error("Ignoring task configuration for task {} because the value is not a Map but is {}", taskId, className(value.value()));
Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
if (!(newTaskConfig instanceof Map)) {
log.error("Invalid data for config of task {} 'properties' field should be a Map but is {}", taskId, className(newTaskConfig));
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.computeIfAbsent(taskId.connector(), k -> new HashMap<>());
log.debug("Storing new config for task {}; this will wait for a commit message before the new config will take effect.", taskId);
Map<String, String> stringsTaskConfig = (Map<String, String>) newTaskConfig;
deferred.put(taskId, stringsTaskConfig);
private void processTasksCommitRecord(String connectorName, SchemaAndValue value) {
List<ConnectorTaskId> updatedTasks = new ArrayList<>();
synchronized (lock) {
// Apply any outstanding deferred task updates for the given connector. Note that just because we
// encounter a commit message does not mean it will result in consistent output. In particular due to
// compaction, there may be cases where . For example if we have the following sequence of writes:
// 1. Write connector "foo"'s config
// 2. Write connector "foo", task 1's config <-- compacted
// 3. Write connector "foo", task 2's config
// 4. Write connector "foo" task commit message
// 5. Write connector "foo", task 1's config
// 6. Write connector "foo", task 2's config
// 7. Write connector "foo" task commit message
// then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied
// "foo" will not have a complete set of configs. Only when message 7 is applied will the complete
// configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that
// only 5 was written, then there may be nothing that will finish writing the configs and get the
// log back into a consistent state.
// It is expected that the user of this class (i.e., the Herder) will take the necessary action to
// resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
// exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
log.error("Ignoring connector tasks configuration commit for connector '{}' because it is in the wrong format: {}", connectorName, className(value.value()));
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
// Validate the configs we're supposed to update to ensure we're getting a complete configuration
// update of all tasks that are expected based on the number of tasks in the commit message.
Set<Integer> taskIdSet = taskIds(connectorName, deferred);
if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
// Given the logic for writing commit messages, we should only hit this condition due to compacted
// historical data, in which case we would not have applied any updates yet and there will be no
// task config data already committed for the connector, so we shouldn't have to clear any data
// out. All we need to do is add the flag marking it inconsistent.
log.debug("We have an incomplete set of task configs for connector '{}' probably due to compaction. So we are not doing anything with the new configuration.", connectorName);
} else {
if (deferred != null) {
connectorTaskConfigGenerations.compute(connectorName, (ignored, generation) -> generation != null ? generation + 1 : 0);
// Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
// update, then we need to see a completely fresh set of configs after this commit message, so we don't
// want any of these outdated configs
if (deferred != null)
connectorTaskCounts.put(connectorName, newTaskCount);
// If task configs appear after the latest task count record, the connector needs a new round of zombie fencing
// before it can start tasks with these configs
if (started)
RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> record, SchemaAndValue value) {
String connectorName = record.key().substring(RESTART_PREFIX.length());
if (!(value.value() instanceof Map)) {
log.error("Ignoring restart request because the value is not a Map but is {}", className(value.value()));
return null;
Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME);
boolean onlyFailed;
if (!(failed instanceof Boolean)) {
log.warn("Invalid data for restart request '{}' field should be a Boolean but is {}, defaulting to {}", ONLY_FAILED_FIELD_NAME, className(failed), ONLY_FAILED_DEFAULT);
} else {
onlyFailed = (Boolean) failed;
Object withTasks = valueAsMap.get(INCLUDE_TASKS_FIELD_NAME);
boolean includeTasks;
if (!(withTasks instanceof Boolean)) {
log.warn("Invalid data for restart request '{}' field should be a Boolean but is {}, defaulting to {}", INCLUDE_TASKS_FIELD_NAME, className(withTasks), INCLUDE_TASKS_DEFAULT);
} else {
includeTasks = (Boolean) withTasks;
return new RestartRequest(connectorName, onlyFailed, includeTasks);
private void processTaskCountRecord(String connectorName, SchemaAndValue value) {
if (!(value.value() instanceof Map)) {
log.error("Ignoring task count record for connector '{}' because it is in the wrong format: {}", connectorName, className(value.value()));
int taskCount = intValue(((Map<String, Object>) value.value()).get("task-count"));
log.debug("Setting task count record for connector '{}' to {}", connectorName, taskCount);
connectorTaskCountRecords.put(connectorName, taskCount);
// If a task count record appears after the latest task configs, the connectors doesn't need a round of zombie
// fencing before it can start tasks with the latest configs
private void processSessionKeyRecord(SchemaAndValue value) {
if (value.value() == null) {
log.error("Ignoring session key because it is unexpectedly null");
if (!(value.value() instanceof Map)) {
log.error("Ignoring session key because the value is not a Map but is {}", className(value.value()));
Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
Object sessionKey = valueAsMap.get("key");
if (!(sessionKey instanceof String)) {
log.error("Invalid data for session key 'key' field should be a String but is {}", className(sessionKey));
byte[] key = Base64.getDecoder().decode((String) sessionKey);
Object keyAlgorithm = valueAsMap.get("algorithm");
if (!(keyAlgorithm instanceof String)) {
log.error("Invalid data for session key 'algorithm' field should be a String but it is {}", className(keyAlgorithm));
Object creationTimestamp = valueAsMap.get("creation-timestamp");
if (!(creationTimestamp instanceof Long)) {
log.error("Invalid data for session key 'creation-timestamp' field should be a long but it is {}", className(creationTimestamp));
KafkaConfigBackingStore.this.sessionKey = new SessionKey(
new SecretKeySpec(key, (String) keyAlgorithm),
(long) creationTimestamp
if (started)
private void processLoggerLevelRecord(String namespace, SchemaAndValue value) {
if (value.value() == null) {
log.error("Ignoring logging level for namespace {} because it is unexpectedly null", namespace);
if (!(value.value() instanceof Map)) {
log.error("Ignoring logging level for namespace {} because the value is not a Map but is {}", namespace, className(value.value()));
Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
Object level = valueAsMap.get("level");
if (!(level instanceof String)) {
log.error("Invalid data for logging level key 'level' field with namespace {}; should be a String but it is {}", namespace, className(level));
if (started) {
updateListener.onLoggingLevelUpdate(namespace, (String) level);
} else {
// TRACE level since there may be many of these records in the config topic
"Ignoring old logging level {} for namespace {} that was writen to the config topic before this worker completed startup",
private ConnectorTaskId parseTaskId(String key) {
String[] parts = key.split("-");
if (parts.length < 3) return null;
try {
int taskNum = Integer.parseInt(parts[parts.length - 1]);
String connectorName = String.join("-", Arrays.copyOfRange(parts, 1, parts.length - 1));
return new ConnectorTaskId(connectorName, taskNum);
} catch (NumberFormatException e) {
return null;
* Given task configurations, get a set of integer task IDs for the connector.
private Set<Integer> taskIds(String connector, Map<ConnectorTaskId, Map<String, String>> configs) {
Set<Integer> tasks = new TreeSet<>();
if (configs == null) {
return tasks;
for (ConnectorTaskId taskId : configs.keySet()) {
assert taskId.connector().equals(connector);
return tasks;
private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) {
// Note that we do *not* check for the exact set. This is an important implication of compaction. If we start out
// with 2 tasks, then reduce to 1, we'll end up with log entries like:
// 1. Connector "foo" config
// 2. Connector "foo", task 1 config
// 3. Connector "foo", task 2 config
// 4. Connector "foo", commit 2 tasks
// 5. Connector "foo", task 1 config
// 6. Connector "foo", commit 1 tasks
// However, due to compaction we could end up with a log that looks like this:
// 1. Connector "foo" config
// 3. Connector "foo", task 2 config
// 5. Connector "foo", task 1 config
// 6. Connector "foo", commit 1 tasks
// which isn't incorrect, but would appear in this code to have an extra task configuration. Instead, we just
// validate that all the configs specified by the commit message are present. This should be fine because the
// logic for writing configs ensures all the task configs are written (and reads them back) before writing the
// commit message.
if (idSet.size() < expectedSize)
return false;
for (int i = 0; i < expectedSize; i++)
if (!idSet.contains(i))
return false;
return true;
// Convert an integer value extracted from a schemaless struct to an int. This handles potentially different
// encodings by different Converters.
private static int intValue(Object value) {
if (value instanceof Integer)
return (int) value;
else if (value instanceof Long)
return (int) (long) value;
throw new ConnectException("Expected integer value to be either Integer or Long");