blob: ea5ba82d4ba08ccc7ef152102f5e24b66b447675 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.util.ConnectorTaskId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
/**
* An immutable snapshot of the configuration state of connectors and tasks in a Kafka Connect cluster.
*/
public class ClusterConfigState {
public static final long NO_OFFSET = -1;
public static final ClusterConfigState EMPTY = new ClusterConfigState(
NO_OFFSET,
Collections.<String, Integer>emptyMap(),
Collections.<String, Map<String, String>>emptyMap(),
Collections.<String, TargetState>emptyMap(),
Collections.<ConnectorTaskId, Map<String, String>>emptyMap(),
Collections.<String>emptySet());
private final long offset;
private final Map<String, Integer> connectorTaskCounts;
private final Map<String, Map<String, String>> connectorConfigs;
private final Map<String, TargetState> connectorTargetStates;
private final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
private final Set<String> inconsistentConnectors;
public ClusterConfigState(long offset,
Map<String, Integer> connectorTaskCounts,
Map<String, Map<String, String>> connectorConfigs,
Map<String, TargetState> connectorTargetStates,
Map<ConnectorTaskId, Map<String, String>> taskConfigs,
Set<String> inconsistentConnectors) {
this.offset = offset;
this.connectorTaskCounts = connectorTaskCounts;
this.connectorConfigs = connectorConfigs;
this.connectorTargetStates = connectorTargetStates;
this.taskConfigs = taskConfigs;
this.inconsistentConnectors = inconsistentConnectors;
}
/**
* Get the last offset read to generate this config state. This offset is not guaranteed to be perfectly consistent
* with the recorded state because some partial updates to task configs may have been read.
* @return the latest config offset
*/
public long offset() {
return offset;
}
/**
* Check whether this snapshot contains configuration for a connector.
* @param connector name of the connector
* @return true if this state contains configuration for the connector, false otherwise
*/
public boolean contains(String connector) {
return connectorConfigs.containsKey(connector);
}
/**
* Get a list of the connectors in this configuration
*/
public Set<String> connectors() {
return connectorConfigs.keySet();
}
/**
* Get the configuration for a connector.
* @param connector name of the connector
* @return a map containing configuration parameters
*/
public Map<String, String> connectorConfig(String connector) {
return connectorConfigs.get(connector);
}
/**
* Get the target state of the connector
* @param connector name of the connector
* @return the target state
*/
public TargetState targetState(String connector) {
return connectorTargetStates.get(connector);
}
/**
* Get the configuration for a task.
* @param task id of the task
* @return a map containing configuration parameters
*/
public Map<String, String> taskConfig(ConnectorTaskId task) {
return taskConfigs.get(task);
}
/**
* Get all task configs for a connector.
* @param connector name of the connector
* @return a list of task configurations
*/
public List<Map<String, String>> allTaskConfigs(String connector) {
Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>();
for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : this.taskConfigs.entrySet()) {
if (taskConfigEntry.getKey().connector().equals(connector))
taskConfigs.put(taskConfigEntry.getKey().task(), taskConfigEntry.getValue());
}
return new LinkedList<>(taskConfigs.values());
}
/**
* Get the number of tasks assigned for the given connector.
* @param connectorName name of the connector to look up tasks for
* @return the number of tasks
*/
public int taskCount(String connectorName) {
Integer count = connectorTaskCounts.get(connectorName);
return count == null ? 0 : count;
}
/**
* Get the current set of task IDs for the specified connector.
* @param connectorName the name of the connector to look up task configs for
* @return the current set of connector task IDs
*/
public List<ConnectorTaskId> tasks(String connectorName) {
if (inconsistentConnectors.contains(connectorName))
return Collections.emptyList();
Integer numTasks = connectorTaskCounts.get(connectorName);
if (numTasks == null)
return Collections.emptyList();
List<ConnectorTaskId> taskIds = new ArrayList<>();
for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName, taskIndex);
taskIds.add(taskId);
}
return taskIds;
}
/**
* Get the set of connectors which have inconsistent data in this snapshot. These inconsistencies can occur due to
* partially completed writes combined with log compaction.
*
* Connectors in this set will appear in the output of {@link #connectors()} since their connector configuration is
* available, but not in the output of {@link #taskConfig(ConnectorTaskId)} since the task configs are incomplete.
*
* When a worker detects a connector in this state, it should request that the connector regenerate its task
* configurations.
*
* @return the set of inconsistent connectors
*/
public Set<String> inconsistentConnectors() {
return inconsistentConnectors;
}
@Override
public String toString() {
return "ClusterConfigState{" +
"offset=" + offset +
", connectorTaskCounts=" + connectorTaskCounts +
", connectorConfigs=" + connectorConfigs +
", taskConfigs=" + taskConfigs +
", inconsistentConnectors=" + inconsistentConnectors +
'}';
}
}