blob: 8d2028852ecf8c67c5cfbed39424ae03330da6ce [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
* <p>
* Provides persistent storage of Kafka Connect connector configurations in a Kafka topic.
* </p>
* <p>
* This class manages both connector and task configurations. It tracks three types of configuration entries:
* <p/>
* 1. Connector config: map of string -> string configurations passed to the Connector class, with support for
* expanding this format if necessary. (Kafka key: connector-[connector-id]).
* These configs are *not* ephemeral. They represent the source of truth. If the entire Connect
* cluster goes down, this is all that is really needed to recover.
* 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding
* this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
* These configs are ephemeral; they are stored here to a) disseminate them to all workers while
* ensuring agreement and b) to allow faster cluster/worker recovery since the common case
* of recovery (restoring a connector) will simply result in the same configuration as before
* the failure.
* 3. Task commit "configs": records indicating that previous task config entries should be committed and all task
* configs for a connector can be applied. (Kafka key: commit-[connector-id].
* This config has two effects. First, it records the number of tasks the connector is currently
* running (and can therefore increase/decrease parallelism). Second, because each task config
* is stored separately but they need to be applied together to ensure each partition is assigned
* to a single task, this record also indicates that task configs for the specified connector
* can be "applied" or "committed".
* </p>
* <p>
* This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
* ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
* us to clean up outdated configurations over time. However, this combination has some important implications for
* the implementation of this class and the configuration state that it may expose.
* </p>
* <p>
* Connector configurations are independent of all other configs, so they are handled easily. Writing a single record
* is already atomic, so these can be applied as soon as they are read. One connectors config does not affect any
* others, and they do not need to coordinate with the connector's task configuration at all.
* </p>
* <p>
* The most obvious implication for task configs is the need for the commit messages. Because Kafka does not
* currently have multi-record transactions or support atomic batch record writes, task commit messages are required
* to ensure that readers do not end up using inconsistent configs. For example, consider if a connector wrote configs
* for its tasks, then was reconfigured and only managed to write updated configs for half its tasks. If task configs
* were applied immediately you could be using half the old configs and half the new configs. In that condition, some
* partitions may be double-assigned because the old config and new config may use completely different assignments.
* Therefore, when reading the log, we must buffer config updates for a connector's tasks and only apply atomically them
* once a commit message has been read.
* </p>
* <p>
* However, there are also further challenges. This simple buffering approach would work fine as long as the entire log was
* always available, but we would like to be able to enable compaction so our configuration topic does not grow
* indefinitely. Compaction may break a normal log because old entries will suddenly go missing. A new worker reading
* from the beginning of the log in order to build up the full current configuration will see task commits, but some
* records required for those commits will have been removed because the same keys have subsequently been rewritten.
* For example, if you have a sequence of record keys [connector-foo-config, task-foo-1-config, task-foo-2-config,
* commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)], we can end up with a compacted log containing
* [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]. When read
* back, the first commit will see an invalid state because the first task-foo-1-config has been cleaned up.
* </p>
* <p>
* Compaction can further complicate things if writing new task configs fails mid-write. Consider a similar scenario
* as the previous one, but in this case both the first and second update will write 2 task configs. However, the
* second write fails half of the way through:
* [connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. Now compaction
* occurs and we're left with
* [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. At the first commit, we don't
* have a complete set of configs. And because of the failure, there is no second commit. We are left in an inconsistent
* state with no obvious way to resolve the issue -- we can try to keep on reading, but the failed node may never
* recover and write the updated config. Meanwhile, other workers may have seen the entire log; they will see the second
* task-foo-1-config waiting to be applied, but will otherwise think everything is ok -- they have a valid set of task
* configs for connector "foo".
* </p>
* <p>
* Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
* (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
* configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
* This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
* inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
* of updating task configurations).
* </p>
* <p>
* Note that the expectation is that this config storage system has only a single writer at a time.
* The caller (Herder) must ensure this is the case. In distributed mode this will require forwarding config change
* requests to the leader in the cluster (i.e. the worker group coordinated by the Kafka broker).
* </p>
* <p>
* Since processing of the config log occurs in a background thread, callers must take care when using accessors.
* To simplify handling this correctly, this class only exposes a mechanism to snapshot the current state of the cluster.
* Updates may continue to be applied (and callbacks invoked) in the background. Callers must take care that they are
* using a consistent snapshot and only update when it is safe. In particular, if task configs are updated which require
* synchronization across workers to commit offsets and update the configuration, callbacks and updates during the
* rebalance must be deferred.
* </p>
public class KafkaConfigBackingStore implements ConfigBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaConfigBackingStore.class);
public static final String TARGET_STATE_PREFIX = "target-state-";
public static String TARGET_STATE_KEY(String connectorName) {
return TARGET_STATE_PREFIX + connectorName;
public static final String CONNECTOR_PREFIX = "connector-";
public static String CONNECTOR_KEY(String connectorName) {
return CONNECTOR_PREFIX + connectorName;
public static final String TASK_PREFIX = "task-";
public static String TASK_KEY(ConnectorTaskId taskId) {
return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
public static final String COMMIT_TASKS_PREFIX = "commit-";
public static String COMMIT_TASKS_KEY(String connectorName) {
return COMMIT_TASKS_PREFIX + connectorName;
// Note that while using real serialization for values as we have here, but ad hoc string serialization for keys,
// isn't ideal, we use this approach because it avoids any potential problems with schema evolution or
// converter/serializer changes causing keys to change. We need to absolutely ensure that the keys remain precisely
// the same.
public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct()
.field("properties",, Schema.OPTIONAL_STRING_SCHEMA))
public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct()
.field("tasks", Schema.INT32_SCHEMA)
public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
.field("state", Schema.STRING_SCHEMA)
private static final long READ_TO_END_TIMEOUT_MS = 30000;
private final Object lock;
private boolean starting;
private final Converter converter;
private UpdateListener updateListener;
private String topic;
// Data is passed to the log already serialized. We use a converter to handle translating to/from generic Connect
// format to serialized form
private KafkaBasedLog<String, byte[]> configLog;
// Connector -> # of tasks
private Map<String, Integer> connectorTaskCounts = new HashMap<>();
// Connector and task configs: name or id -> config map
private Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
private Set<String> inconsistent = new HashSet<>();
// The most recently read offset. This does not take into account deferred task updates/commits, so we may have
// outstanding data to be applied.
private volatile long offset;
// Connector -> Map[ConnectorTaskId -> Configs]
private final Map<String, Map<ConnectorTaskId, Map<String, String>>> deferredTaskUpdates = new HashMap<>();
private final Map<String, TargetState> connectorTargetStates = new HashMap<>();
public KafkaConfigBackingStore(Converter converter) {
this.lock = new Object();
this.starting = false;
this.converter = converter;
this.offset = -1;
public void setUpdateListener(UpdateListener listener) {
this.updateListener = listener;
public void configure(WorkerConfig config) {
topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
if (topic.equals(""))
throw new ConfigException("Must specify topic for connector configuration.");
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
configLog = createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback());
public void start() {"Starting KafkaConfigBackingStore");
// During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
// updates can continue to occur in the background
starting = true;
starting = false;"Started KafkaConfigBackingStore");
public void stop() {"Closing KafkaConfigBackingStore");
configLog.stop();"Closed KafkaConfigBackingStore");
* Get a snapshot of the current state of the cluster.
public ClusterConfigState snapshot() {
synchronized (lock) {
// Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be
// immutable configs
return new ClusterConfigState(
new HashMap<>(connectorTaskCounts),
new HashMap<>(connectorConfigs),
new HashMap<>(connectorTargetStates),
new HashMap<>(taskConfigs),
new HashSet<>(inconsistent)
public boolean contains(String connector) {
synchronized (lock) {
return connectorConfigs.containsKey(connector);
* Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by
* tailing the Kafka log with a consumer.
* @param connector name of the connector to write data for
* @param properties the configuration to write
public void putConnectorConfig(String connector, Map<String, String> properties) {
log.debug("Writing connector configuration {} for connector {} configuration", properties, connector);
Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
connectConfig.put("properties", properties);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
updateConnectorConfig(connector, serializedConfig);
* Remove configuration for a given connector.
* @param connector name of the connector to remove
public void removeConnectorConfig(String connector) {
log.debug("Removing connector configuration for connector {}", connector);
updateConnectorConfig(connector, null);
configLog.send(TARGET_STATE_KEY(connector), null);
public void removeTaskConfigs(String connector) {
throw new UnsupportedOperationException("Removal of tasks is not currently supported");
private void updateConnectorConfig(String connector, byte[] serializedConfig) {
try {
configLog.send(CONNECTOR_KEY(connector), serializedConfig);
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write connector configuration to Kafka: ", e);
throw new ConnectException("Error writing connector configuration to Kafka", e);
* Write these task configurations and associated commit messages, unless an inconsistency is found that indicates
* that we would be leaving one of the referenced connectors with an inconsistent state.
* @param configs map containing task configurations
* @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root
* and task configurations.
public void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs) {
// Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
// any outstanding lagging data to consume.
try {
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write root configuration to Kafka: ", e);
throw new ConnectException("Error writing root configuration to Kafka", e);
// In theory, there is only a single writer and we shouldn't need this lock since the background thread should
// not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to
// the root config being updated.
Map<String, Integer> newTaskCounts = new HashMap<>();
synchronized (lock) {
// Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks
// in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here
Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(configs);
for (Map.Entry<String, Set<Integer>> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) {
if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) {
log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission");
throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors");
newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size());
// Start sending all the individual updates
for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
connectConfig.put("properties", taskConfigEntry.getValue());
byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
log.debug("Writing configuration for task " + taskConfigEntry.getKey() + " configuration: " + taskConfigEntry.getValue());
configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig);
// Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
// the end of the log
try {
// Read to end to ensure all the task configs have been written
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
// Write all the commit messages
for (Map.Entry<String, Integer> taskCountEntry : newTaskCounts.entrySet()) {
Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
connectConfig.put("tasks", taskCountEntry.getValue());
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
log.debug("Writing commit for connector " + taskCountEntry.getKey() + " with " + taskCountEntry.getValue() + " tasks.");
configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig);
// Read to end to ensure all the commit messages have been written
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write root configuration to Kafka: ", e);
throw new ConnectException("Error writing root configuration to Kafka", e);
public void refresh(long timeout, TimeUnit unit) throws TimeoutException {
try {
configLog.readToEnd().get(timeout, unit);
} catch (InterruptedException | ExecutionException e) {
throw new ConnectException("Error trying to read to end of config log", e);
public void putTargetState(String connector, TargetState state) {
Struct connectTargetState = new Struct(TARGET_STATE_V0);
byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState);
log.debug("Writing target state {} for connector {}", state, connector);
configLog.send(TARGET_STATE_KEY(connector), serializedTargetState);
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
private class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
if (error != null) {
log.error("Unexpected in consumer callback for KafkaConfigBackingStore: ", error);
final SchemaAndValue value;
try {
value = converter.toConnectData(topic, record.value());
} catch (DataException e) {
log.error("Failed to convert config data to Kafka Connect format: ", e);
// Make the recorded offset match the API used for positions in the consumer -- return the offset of the
// *next record*, not the last one consumed.
offset = record.offset() + 1;
if (record.key().startsWith(TARGET_STATE_PREFIX)) {
String connectorName = record.key().substring(TARGET_STATE_PREFIX.length());
synchronized (lock) {
if (value.value() != null) {
if (!(value.value() instanceof Map)) {
log.error("Found target state ({}) in wrong format: {}", record.key(), value.value().getClass());
Object targetState = ((Map<String, Object>) value.value()).get("state");
if (!(targetState instanceof String)) {
log.error("Invalid data for target state for connector ({}): 'state' field should be a Map but is {}",
connectorName, targetState == null ? null : targetState.getClass());
try {
TargetState state = TargetState.valueOf((String) targetState);
log.trace("Setting target state for connector {} to {}", connectorName, targetState);
connectorTargetStates.put(connectorName, state);
} catch (IllegalArgumentException e) {
log.error("Invalid target state for connector ({}): {}", connectorName, targetState);
if (!starting)
} else if (record.key().startsWith(CONNECTOR_PREFIX)) {
String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
boolean removed = false;
synchronized (lock) {
if (value.value() == null) {
// Connector deletion will be written as a null value"Removed connector " + connectorName + " due to null configuration. This is usually intentional and does not indicate an issue.");
removed = true;
} else {
// Connector configs can be applied and callbacks invoked immediately
if (!(value.value() instanceof Map)) {
log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
if (!(newConnectorConfig instanceof Map)) {
log.error("Invalid data for connector config ({}): properties field should be a Map but is {}", connectorName,
newConnectorConfig == null ? null : newConnectorConfig.getClass());
log.debug("Updating configuration for connector " + connectorName + " configuration: " + newConnectorConfig);
connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
if (!connectorTargetStates.containsKey(connectorName))
connectorTargetStates.put(connectorName, TargetState.STARTED);
if (!starting) {
if (removed)
} else if (record.key().startsWith(TASK_PREFIX)) {
synchronized (lock) {
ConnectorTaskId taskId = parseTaskId(record.key());
if (taskId == null) {
log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key");
if (!(value.value() instanceof Map)) {
log.error("Ignoring task configuration for task " + taskId + " because it is in the wrong format: " + value.value());
Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
if (!(newTaskConfig instanceof Map)) {
log.error("Invalid data for task config (" + taskId + "): properties filed should be a Map but is " + newTaskConfig.getClass());
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(taskId.connector());
if (deferred == null) {
deferred = new HashMap<>();
deferredTaskUpdates.put(taskId.connector(), deferred);
log.debug("Storing new config for task " + taskId + " this will wait for a commit message before the new config will take effect. New config: " + newTaskConfig);
deferred.put(taskId, (Map<String, String>) newTaskConfig);
} else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length());
List<ConnectorTaskId> updatedTasks = new ArrayList<>();
synchronized (lock) {
// Apply any outstanding deferred task updates for the given connector. Note that just because we
// encounter a commit message does not mean it will result in consistent output. In particular due to
// compaction, there may be cases where . For example if we have the following sequence of writes:
// 1. Write connector "foo"'s config
// 2. Write connector "foo", task 1's config <-- compacted
// 3. Write connector "foo", task 2's config
// 4. Write connector "foo" task commit message
// 5. Write connector "foo", task 1's config
// 6. Write connector "foo", task 2's config
// 7. Write connector "foo" task commit message
// then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied
// "foo" will not have a complete set of configs. Only when message 7 is applied will the complete
// configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that
// only 5 was written, then there may be nothing that will finish writing the configs and get the
// log back into a consistent state.
// It is expected that the user of this class (i.e., the Herder) will take the necessary action to
// resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
// exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
log.error("Ignoring connector tasks configuration commit for connector " + connectorName + " because it is in the wrong format: " + value.value());
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
// Validate the configs we're supposed to update to ensure we're getting a complete configuration
// update of all tasks that are expected based on the number of tasks in the commit message.
Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName);
if (taskIdSet == null) {
//TODO: Figure out why this happens (KAFKA-3321)
log.error("Received a commit message for connector " + connectorName + " but there is no matching configuration for tasks in this connector. This should never happen.");
if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
// Given the logic for writing commit messages, we should only hit this condition due to compacted
// historical data, in which case we would not have applied any updates yet and there will be no
// task config data already committed for the connector, so we shouldn't have to clear any data
// out. All we need to do is add the flag marking it inconsistent.
log.debug("We have an incomplete set of task configs for connector " + connectorName + " probably due to compaction. So we are not doing anything with the new configuration.");
} else {
if (deferred != null) {
// Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
// update, then we need to see a completely fresh set of configs after this commit message, so we don't
// want any of these outdated configs
if (deferred != null)
connectorTaskCounts.put(connectorName, newTaskCount);
if (!starting)
} else {
log.error("Discarding config update record with invalid key: " + record.key());
private ConnectorTaskId parseTaskId(String key) {
String[] parts = key.split("-");
if (parts.length < 3) return null;
try {
int taskNum = Integer.parseInt(parts[parts.length - 1]);
String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-");
return new ConnectorTaskId(connectorName, taskNum);
} catch (NumberFormatException e) {
return null;
* Given task configurations, get a set of integer task IDs organized by connector name.
private Map<String, Set<Integer>> taskIdsByConnector(Map<ConnectorTaskId, Map<String, String>> configs) {
Map<String, Set<Integer>> connectorTaskIds = new HashMap<>();
if (configs == null)
return connectorTaskIds;
for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
ConnectorTaskId taskId = taskConfigEntry.getKey();
if (!connectorTaskIds.containsKey(taskId.connector()))
connectorTaskIds.put(taskId.connector(), new TreeSet<Integer>());
return connectorTaskIds;
private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) {
// Note that we do *not* check for the exact set. This is an important implication of compaction. If we start out
// with 2 tasks, then reduce to 1, we'll end up with log entries like:
// 1. Connector "foo" config
// 2. Connector "foo", task 1 config
// 3. Connector "foo", task 2 config
// 4. Connector "foo", commit 2 tasks
// 5. Connector "foo", task 1 config
// 6. Connector "foo", commit 1 tasks
// However, due to compaction we could end up with a log that looks like this:
// 1. Connector "foo" config
// 3. Connector "foo", task 2 config
// 5. Connector "foo", task 1 config
// 6. Connector "foo", commit 1 tasks
// which isn't incorrect, but would appear in this code to have an extra task configuration. Instead, we just
// validate that all the configs specified by the commit message are present. This should be fine because the
// logic for writing configs ensures all the task configs are written (and reads them back) before writing the
// commit message.
if (idSet.size() < expectedSize)
return false;
for (int i = 0; i < expectedSize; i++)
if (!idSet.contains(i))
return false;
return true;
// Convert an integer value extracted from a schemaless struct to an int. This handles potentially different
// encodings by different Converters.
private static int intValue(Object value) {
if (value instanceof Integer)
return (int) value;
else if (value instanceof Long)
return (int) (long) value;
throw new ConnectException("Expected integer value to be either Integer or Long");