blob: 4f2d832fc83797f43a759d0be2c1e8d4bc845e18 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.Table;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
* StatusBackingStore implementation which uses a compacted topic for storage
* of connector and task status information. When a state change is observed,
* the new state is written to the compacted topic. The new state will not be
* visible until it has been read back from the topic.
* <p>
* In spite of their names, the {@link #putSafe} methods cannot guarantee the safety
* of the write (since Kafka itself cannot provide such guarantees currently),
* but it can avoid specific unsafe conditions. In particular, {@link #putSafe}
* allows writes in the following conditions:
* <ol>
* <li>It is (probably) safe to overwrite the state if there is no previous
* value.
* <li>It is (probably) safe to overwrite the state if the previous value was
* set by a worker with the same workerId.
* <li>It is (probably) safe to overwrite the previous state if the current
* generation is higher than the previous .
* </ol>
* Basically all these conditions do is reduce the window for conflicts. They
* obviously cannot take into account in-flight requests.
public class KafkaStatusBackingStore extends KafkaTopicBasedBackingStore implements StatusBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
public static final String TASK_STATUS_PREFIX = "status-task-";
public static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
public static final String TOPIC_STATUS_PREFIX = "status-topic-";
public static final String TOPIC_STATUS_SEPARATOR = ":connector-";
public static final String STATE_KEY_NAME = "state";
public static final String TRACE_KEY_NAME = "trace";
public static final String WORKER_ID_KEY_NAME = "worker_id";
public static final String GENERATION_KEY_NAME = "generation";
public static final String TOPIC_STATE_KEY = "topic";
public static final String TOPIC_NAME_KEY = "name";
public static final String TOPIC_CONNECTOR_KEY = "connector";
public static final String TOPIC_TASK_KEY = "task";
public static final String TOPIC_DISCOVER_TIMESTAMP_KEY = "discoverTimestamp";
private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct()
.field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
private static final Schema TOPIC_STATUS_VALUE_SCHEMA_V0 = SchemaBuilder.struct()
private static final Schema TOPIC_STATUS_SCHEMA_V0 =
private final Time time;
private final Converter converter;
//visible for testing
protected final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
protected final Map<String, CacheEntry<ConnectorStatus>> connectors;
protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> topics;
private final Supplier<TopicAdmin> topicAdminSupplier;
private final String clientId;
private String statusTopic;
private KafkaBasedLog<String, byte[]> kafkaLog;
private int generation;
private ExecutorService sendRetryExecutor;
public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> topicAdminSupplier, String clientIdBase) {
this.time = time;
this.converter = converter;
this.tasks = new Table<>();
this.connectors = new HashMap<>();
this.topics = new ConcurrentHashMap<>();
this.topicAdminSupplier = topicAdminSupplier;
this.clientId = Objects.requireNonNull(clientIdBase) + "statuses";
// visible for testing
KafkaStatusBackingStore(Time time, Converter converter, String statusTopic, Supplier<TopicAdmin> topicAdminSupplier,
KafkaBasedLog<String, byte[]> kafkaLog) {
this(time, converter, null, "connect-distributed-");
this.kafkaLog = kafkaLog;
this.statusTopic = statusTopic;
sendRetryExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("status-store-retry-" + statusTopic, true));
public void configure(final WorkerConfig config) {
this.statusTopic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
if (this.statusTopic == null || this.statusTopic.trim().isEmpty())
throw new ConfigException("Must specify topic for connector status.");
sendRetryExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("status-store-retry-" + statusTopic, true));
String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // disable idempotence since retries is force to 0
producerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
Map<String, Object> adminProps = new HashMap<>(originals);
adminProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).statusStorageTopicSettings()
: Collections.emptyMap();
NewTopic topicDescription = TopicAdmin.defineTopic(statusTopic)
.config(topicSettings) // first so that we override user-supplied settings as needed
Callback<ConsumerRecord<String, byte[]>> readCallback = (error, record) -> read(record);
this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, topicAdminSupplier, config, time);
public void start() {
// read to the end on startup to ensure that api requests see the most recent states
public void stop() {
try {
} finally {
ThreadUtils.shutdownExecutorServiceQuietly(sendRetryExecutor, 10, TimeUnit.SECONDS);
public void put(final ConnectorStatus status) {
sendConnectorStatus(status, false);
public void putSafe(final ConnectorStatus status) {
sendConnectorStatus(status, true);
public void put(final TaskStatus status) {
sendTaskStatus(status, false);
public void putSafe(final TaskStatus status) {
sendTaskStatus(status, true);
public void put(final TopicStatus status) {
sendTopicStatus(status.connector(), status.topic(), status);
public void flush() {
private void sendConnectorStatus(final ConnectorStatus status, boolean safeWrite) {
String connector =;
CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
String key = CONNECTOR_STATUS_PREFIX + connector;
send(key, status, entry, safeWrite);
private void sendTaskStatus(final TaskStatus status, boolean safeWrite) {
ConnectorTaskId taskId =;
CacheEntry<TaskStatus> entry = getOrAdd(taskId);
String key = TASK_STATUS_PREFIX + taskId.connector() + "-" + taskId.task();
send(key, status, entry, safeWrite);
private void sendTopicStatus(final String connector, final String topic, final TopicStatus status) {
String key = TOPIC_STATUS_PREFIX + topic + TOPIC_STATUS_SEPARATOR + connector;
final byte[] value = serializeTopicStatus(status);
kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) return;
// TODO: retry more gracefully and not forever
if (exception instanceof RetriableException) {
sendRetryExecutor.submit(() -> kafkaLog.send(key, value, this));
} else {
log.error("Failed to write status update", exception);
private <V extends AbstractStatus<?>> void send(final String key,
final V status,
final CacheEntry<V> entry,
final boolean safeWrite) {
final int sequence;
synchronized (this) {
this.generation = status.generation();
if (safeWrite && !entry.canWriteSafely(status))
sequence = entry.increment();
final byte[] value = status.state() == ConnectorStatus.State.DESTROYED ? null : serialize(status);
kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) return;
if (exception instanceof RetriableException) {
synchronized (KafkaStatusBackingStore.this) {
if (entry.isDeleted()
|| status.generation() != generation
|| (safeWrite && !entry.canWriteSafely(status, sequence)))
sendRetryExecutor.submit(() -> kafkaLog.send(key, value, this));
} else {
log.error("Failed to write status update", exception);
private synchronized CacheEntry<ConnectorStatus> getOrAdd(String connector) {
CacheEntry<ConnectorStatus> entry = connectors.get(connector);
if (entry == null) {
entry = new CacheEntry<>();
connectors.put(connector, entry);
return entry;
private synchronized void remove(String connector) {
CacheEntry<ConnectorStatus> removed = connectors.remove(connector);
if (removed != null)
Map<Integer, CacheEntry<TaskStatus>> tasks = this.tasks.remove(connector);
if (tasks != null) {
for (CacheEntry<TaskStatus> taskEntry : tasks.values())
private synchronized CacheEntry<TaskStatus> getOrAdd(ConnectorTaskId task) {
CacheEntry<TaskStatus> entry = tasks.get(task.connector(), task.task());
if (entry == null) {
entry = new CacheEntry<>();
tasks.put(task.connector(), task.task(), entry);
return entry;
private synchronized void remove(ConnectorTaskId id) {
CacheEntry<TaskStatus> removed = tasks.remove(id.connector(), id.task());
if (removed != null)
private void removeTopic(String topic, String connector) {
ConcurrentMap<String, TopicStatus> activeTopics = topics.get(connector);
if (activeTopics == null) {
public synchronized TaskStatus get(ConnectorTaskId id) {
CacheEntry<TaskStatus> entry = tasks.get(id.connector(), id.task());
return entry == null ? null : entry.get();
public synchronized ConnectorStatus get(String connector) {
CacheEntry<ConnectorStatus> entry = connectors.get(connector);
return entry == null ? null : entry.get();
public synchronized Collection<TaskStatus> getAll(String connector) {
List<TaskStatus> res = new ArrayList<>();
for (CacheEntry<TaskStatus> statusEntry : tasks.row(connector).values()) {
TaskStatus status = statusEntry.get();
if (status != null)
return res;
public TopicStatus getTopic(String connector, String topic) {
ConcurrentMap<String, TopicStatus> activeTopics = topics.get(Objects.requireNonNull(connector));
return activeTopics != null ? activeTopics.get(Objects.requireNonNull(topic)) : null;
public Collection<TopicStatus> getAllTopics(String connector) {
ConcurrentMap<String, TopicStatus> activeTopics = topics.get(Objects.requireNonNull(connector));
return activeTopics != null
? Collections.unmodifiableCollection(Objects.requireNonNull(activeTopics.values()))
: Collections.emptySet();
public void deleteTopic(String connector, String topic) {
sendTopicStatus(Objects.requireNonNull(connector), Objects.requireNonNull(topic), null);
public synchronized Set<String> connectors() {
return new HashSet<>(connectors.keySet());
private ConnectorStatus parseConnectorStatus(String connector, byte[] data) {
try {
SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data);
if (!(schemaAndValue.value() instanceof Map)) {
log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
return null;
Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
String trace = (String) statusMap.get(TRACE_KEY_NAME);
String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
return new ConnectorStatus(connector, state, trace, workerUrl, generation);
} catch (Exception e) {
log.error("Failed to deserialize connector status", e);
return null;
private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) {
try {
SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data);
if (!(schemaAndValue.value() instanceof Map)) {
log.error("Invalid task status type {}", schemaAndValue.value().getClass());
return null;
Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
String trace = (String) statusMap.get(TRACE_KEY_NAME);
String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
return new TaskStatus(taskId, state, workerUrl, generation, trace);
} catch (Exception e) {
log.error("Failed to deserialize task status", e);
return null;
protected TopicStatus parseTopicStatus(byte[] data) {
try {
SchemaAndValue schemaAndValue = converter.toConnectData(statusTopic, data);
if (!(schemaAndValue.value() instanceof Map)) {
log.error("Invalid topic status value {}", schemaAndValue.value());
return null;
Object innerValue = ((Map<String, Object>) schemaAndValue.value()).get(TOPIC_STATE_KEY);
if (!(innerValue instanceof Map)) {
log.error("Invalid topic status value {} for field {}", innerValue, TOPIC_STATE_KEY);
return null;
Map<String, Object> topicStatusMetadata = (Map<String, Object>) innerValue;
return new TopicStatus((String) topicStatusMetadata.get(TOPIC_NAME_KEY),
(String) topicStatusMetadata.get(TOPIC_CONNECTOR_KEY),
((Long) topicStatusMetadata.get(TOPIC_TASK_KEY)).intValue(),
(long) topicStatusMetadata.get(TOPIC_DISCOVER_TIMESTAMP_KEY));
} catch (Exception e) {
log.error("Failed to deserialize topic status", e);
return null;
private byte[] serialize(AbstractStatus<?> status) {
Struct struct = new Struct(STATUS_SCHEMA_V0);
struct.put(STATE_KEY_NAME, status.state().name());
if (status.trace() != null)
struct.put(TRACE_KEY_NAME, status.trace());
struct.put(WORKER_ID_KEY_NAME, status.workerId());
struct.put(GENERATION_KEY_NAME, status.generation());
return converter.fromConnectData(statusTopic, STATUS_SCHEMA_V0, struct);
//visible for testing
protected byte[] serializeTopicStatus(TopicStatus status) {
if (status == null) {
// This should send a tombstone record that will represent delete
return null;
Struct struct = new Struct(TOPIC_STATUS_VALUE_SCHEMA_V0);
struct.put(TOPIC_NAME_KEY, status.topic());
struct.put(TOPIC_CONNECTOR_KEY, status.connector());
struct.put(TOPIC_TASK_KEY, status.task());
struct.put(TOPIC_DISCOVER_TIMESTAMP_KEY, status.discoverTimestamp());
return converter.fromConnectData(
Collections.singletonMap(TOPIC_STATE_KEY, struct));
private String parseConnectorStatusKey(String key) {
return key.substring(CONNECTOR_STATUS_PREFIX.length());
private ConnectorTaskId parseConnectorTaskId(String key) {
String[] parts = key.split("-");
if (parts.length < 4) return null;
try {
int taskNum = Integer.parseInt(parts[parts.length - 1]);
String connectorName = String.join("-", Arrays.copyOfRange(parts, 2, parts.length - 1));
return new ConnectorTaskId(connectorName, taskNum);
} catch (NumberFormatException e) {
log.warn("Invalid task status key {}", key);
return null;
private void readConnectorStatus(String key, byte[] value) {
String connector = parseConnectorStatusKey(key);
if (connector.isEmpty()) {
log.warn("Discarding record with invalid connector status key {}", key);
if (value == null) {
log.trace("Removing status for connector {}", connector);
ConnectorStatus status = parseConnectorStatus(connector, value);
if (status == null)
synchronized (this) {
log.trace("Received connector {} status update {}", connector, status);
CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
private void readTaskStatus(String key, byte[] value) {
ConnectorTaskId id = parseConnectorTaskId(key);
if (id == null) {
log.warn("Discarding record with invalid task status key {}", key);
if (value == null) {
log.trace("Removing task status for {}", id);
TaskStatus status = parseTaskStatus(id, value);
if (status == null) {
log.warn("Failed to parse task status with key {}", key);
synchronized (this) {
log.trace("Received task {} status update {}", id, status);
CacheEntry<TaskStatus> entry = getOrAdd(id);
// During frequent rebalances, there could be a race condition because of which
// an UNASSIGNED state of a prior or same generation can be sent by a worker despite a
// RUNNING status by another worker because the first worker
// couldn't read the latest RUNNING status. This can lead to an inaccurate status
// representation even though the task might be actually running.
// Note that this could also mean that when a generation reset happens, and an
// UNASSIGNED status is sent, then it would be ignored if the current status is RUNNING
// at a higher generation. But since it will be followed by a RUNNING or a different
// status message(at the lower generation) soon after, the misrepresentation of the UNASSIGNED
// status would be short-lived in most cases.
if (status.state() == TaskStatus.State.UNASSIGNED
&& entry.get() != null
&& entry.get().state() == TaskStatus.State.RUNNING
&& entry.get().generation() >= status.generation()) {
log.trace("Ignoring stale status {} in favour of more upto date status {}", status, entry.get());
private void readTopicStatus(String key, byte[] value) {
int delimiterPos = key.indexOf(':');
int beginPos = TOPIC_STATUS_PREFIX.length();
if (beginPos > delimiterPos) {
log.warn("Discarding record with invalid topic status key {}", key);
String topic = key.substring(beginPos, delimiterPos);
if (topic.isEmpty()) {
log.warn("Discarding record with invalid topic status key containing empty topic {}", key);
beginPos = delimiterPos + TOPIC_STATUS_SEPARATOR.length();
int endPos = key.length();
if (beginPos > endPos) {
log.warn("Discarding record with invalid topic status key {}", key);
String connector = key.substring(beginPos);
if (connector.isEmpty()) {
log.warn("Discarding record with invalid topic status key containing empty connector {}", key);
if (value == null) {
log.trace("Removing status for topic {} and connector {}", topic, connector);
removeTopic(topic, connector);
TopicStatus status = parseTopicStatus(value);
if (status == null) {
log.warn("Failed to parse topic status with key {}", key);
log.trace("Received topic status update {}", status);
topics.computeIfAbsent(connector, k -> new ConcurrentHashMap<>())
.put(topic, status);
// visible for testing
void read(ConsumerRecord<String, byte[]> record) {
String key = record.key();
if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
readConnectorStatus(key, record.value());
} else if (key.startsWith(TASK_STATUS_PREFIX)) {
readTaskStatus(key, record.value());
} else if (key.startsWith(TOPIC_STATUS_PREFIX)) {
readTopicStatus(key, record.value());
} else {
log.warn("Discarding record with invalid key {}", key);
protected String getTopicConfig() {
return DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
protected String getTopicPurpose() {
return "connector and task statuses";
private static class CacheEntry<T extends AbstractStatus<?>> {
private T value = null;
private int sequence = 0;
private boolean deleted = false;
public int increment() {
return ++sequence;
public void put(T value) {
this.value = value;
public T get() {
return value;
public void delete() {
this.deleted = true;
public boolean isDeleted() {
return deleted;
public boolean canWriteSafely(T status) {
return value == null
|| value.workerId().equals(status.workerId())
|| value.generation() <= status.generation();
public boolean canWriteSafely(T status, int sequence) {
return canWriteSafely(status) && this.sequence == sequence;