blob: 037eba742816f43cd774caf8c01951413552a357 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* <p>
* Distributed "herder" that coordinates with other workers to spread work across multiple processes.
* </p>
* <p>
* Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized
* group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current
* configuration state is (where it is in the configuration log). The group coordinator selects one member to take
* this information and assign each instance a subset of the active connectors & tasks to execute. This assignment
* is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose
* to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once
* an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker.
* </p>
* <p>
* In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment
* to select a leader for this generation of the group who is responsible for other tasks that can only be performed
* by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks,
* (and therefore, also for creating, destroy, and scaling up/down connectors).
* </p>
* <p>
* The DistributedHerder uses a single thread for most of its processing. This includes processing
* config changes, handling task rebalances and serving requests from the HTTP layer. The latter are pushed
* into a queue until the thread has time to handle them. A consequence of this is that requests can get blocked
* behind a worker rebalance. When the herder knows that a rebalance is expected, it typically returns an error
* immediately to the request, but this is not always possible (in particular when another worker has requested
* the rebalance). Similar to handling HTTP requests, config changes which are observed asynchronously by polling
* the config log are batched for handling in the work thread.
* </p>
*/
public class DistributedHerder extends AbstractHerder implements Runnable {
private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
private final Time time;
private final int workerSyncTimeoutMs;
private final int workerUnsyncBackoffMs;
private final ExecutorService forwardRequestExecutor;
private final WorkerGroupMember member;
private final AtomicBoolean stopping;
private final CountDownLatch stopLatch = new CountDownLatch(1);
// Track enough information about the current membership state to be able to determine which requests via the API
// and the from other nodes are safe to process
private boolean rebalanceResolved;
private ConnectProtocol.Assignment assignment;
private boolean canReadConfigs;
private ClusterConfigState configState;
// To handle most external requests, like creating or destroying a connector, we can use a generic request where
// the caller specifies all the code that should be executed.
private final Queue<HerderRequest> requests = new PriorityQueue<>();
// Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
// needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
private Set<String> connectorConfigUpdates = new HashSet<>();
// Similarly collect target state changes (when observed by the config storage listener) for handling in the
// herder's main thread.
private Set<String> connectorTargetStateChanges = new HashSet<>();
private boolean needsReconfigRebalance;
private volatile int generation;
public DistributedHerder(DistributedConfig config,
Time time,
Worker worker,
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore,
String restUrl) {
this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, time);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
// visible for testing
DistributedHerder(DistributedConfig config,
Worker worker,
String workerId,
StatusBackingStore statusBackingStore,
ConfigBackingStore configStorage,
WorkerGroupMember member,
String restUrl,
Time time) {
super(worker, workerId, statusBackingStore, configStorage);
this.time = time;
this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(), time);
this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
stopping = new AtomicBoolean(false);
configState = ClusterConfigState.EMPTY;
rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
needsReconfigRebalance = false;
canReadConfigs = true; // We didn't try yet, but Configs are readable until proven otherwise
}
@Override
public void start() {
Thread thread = new Thread(this, "DistributedHerder");
thread.start();
}
@Override
public void run() {
try {
log.info("Herder starting");
startServices();
log.info("Herder started");
while (!stopping.get()) {
tick();
}
halt();
log.info("Herder stopped");
} catch (Throwable t) {
log.error("Uncaught exception in herder work thread, exiting: ", t);
stopLatch.countDown();
System.exit(1);
} finally {
stopLatch.countDown();
}
}
// public for testing
public void tick() {
// The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events
// as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is
// performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly
// blocking up this thread (especially those in callbacks due to rebalance events).
try {
// if we failed to read to end of log before, we need to make sure the issue was resolved before joining group
// Joining and immediately leaving for failure to read configs is exceedingly impolite
if (!canReadConfigs && !readConfigToEnd(workerSyncTimeoutMs))
return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us
member.ensureActive();
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
if (!handleRebalanceCompleted()) return;
} catch (WakeupException e) {
// May be due to a request from another thread, or might be stopping. If the latter, we need to check the
// flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests
// unless we're in the group.
return;
}
// Process any external requests
final long now = time.milliseconds();
long nextRequestTimeoutMs = Long.MAX_VALUE;
while (true) {
final HerderRequest next;
synchronized (this) {
next = requests.peek();
if (next == null) {
break;
} else if (now >= next.at) {
requests.poll();
} else {
nextRequestTimeoutMs = next.at - now;
break;
}
}
try {
next.action().call();
next.callback().onCompletion(null, null);
} catch (Throwable t) {
next.callback().onCompletion(t, null);
}
}
// Process any configuration updates
Set<String> connectorConfigUpdatesCopy = null;
Set<String> connectorTargetStateChangesCopy = null;
synchronized (this) {
if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty() || !connectorTargetStateChanges.isEmpty()) {
// Connector reconfigs only need local updates since there is no coordination between workers required.
// However, if connectors were added or removed, work needs to be rebalanced since we have more work
// items to distribute among workers.
configState = configBackingStore.snapshot();
if (needsReconfigRebalance) {
// Task reconfigs require a rebalance. Request the rebalance, clean out state, and then restart
// this loop, which will then ensure the rebalance occurs without any other requests being
// processed until it completes.
member.requestRejoin();
// Any connector config updates or target state changes will be addressed during the rebalance too
connectorConfigUpdates.clear();
connectorTargetStateChanges.clear();
needsReconfigRebalance = false;
return;
} else {
if (!connectorConfigUpdates.isEmpty()) {
// We can't start/stop while locked since starting connectors can cause task updates that will
// require writing configs, which in turn make callbacks into this class from another thread that
// require acquiring a lock. This leads to deadlock. Instead, just copy the info we need and process
// the updates after unlocking.
connectorConfigUpdatesCopy = connectorConfigUpdates;
connectorConfigUpdates = new HashSet<>();
}
if (!connectorTargetStateChanges.isEmpty()) {
// Similarly for target state changes which can cause connectors to be restarted
connectorTargetStateChangesCopy = connectorTargetStateChanges;
connectorTargetStateChanges = new HashSet<>();
}
}
}
}
if (connectorConfigUpdatesCopy != null)
processConnectorConfigUpdates(connectorConfigUpdatesCopy);
if (connectorTargetStateChangesCopy != null)
processTargetStateChanges(connectorTargetStateChangesCopy);
// Let the group take any actions it needs to
try {
member.poll(nextRequestTimeoutMs);
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
if (!handleRebalanceCompleted()) return;
} catch (WakeupException e) { // FIXME should not be WakeupException
// Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
}
}
private void processConnectorConfigUpdates(Set<String> connectorConfigUpdates) {
// If we only have connector config updates, we can just bounce the updated connectors that are
// currently assigned to this worker.
Set<String> localConnectors = assignment == null ? Collections.<String>emptySet() : new HashSet<>(assignment.connectors());
for (String connectorName : connectorConfigUpdates) {
if (!localConnectors.contains(connectorName))
continue;
boolean remains = configState.contains(connectorName);
log.info("Handling connector-only config update by {} connector {}",
remains ? "restarting" : "stopping", connectorName);
worker.stopConnector(connectorName);
// The update may be a deletion, so verify we actually need to restart the connector
if (remains)
startConnector(connectorName);
}
}
private void processTargetStateChanges(Set<String> connectorTargetStateChanges) {
if (!connectorTargetStateChanges.isEmpty()) {
for (String connector : connectorTargetStateChanges) {
if (worker.connectorNames().contains(connector)) {
TargetState targetState = configState.targetState(connector);
worker.setTargetState(connector, targetState);
if (targetState == TargetState.STARTED)
reconfigureConnectorTasksWithRetry(connector);
}
}
}
}
// public for testing
public void halt() {
synchronized (this) {
// Clean up any connectors and tasks that are still running.
log.info("Stopping connectors and tasks that are still assigned to this worker.");
for (String connName : new HashSet<>(worker.connectorNames())) {
try {
worker.stopConnector(connName);
} catch (Throwable t) {
log.error("Failed to shut down connector " + connName, t);
}
}
Set<ConnectorTaskId> tasks = new HashSet<>(worker.taskIds());
worker.stopTasks(tasks);
worker.awaitStopTasks(tasks);
member.stop();
// Explicitly fail any outstanding requests so they actually get a response and get an understandable reason
// for their failure
while (!requests.isEmpty()) {
HerderRequest request = requests.poll();
request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
}
stopServices();
}
}
@Override
public void stop() {
log.info("Herder stopping");
stopping.set(true);
member.wakeup();
while (stopLatch.getCount() > 0) {
try {
stopLatch.await();
} catch (InterruptedException e) {
// ignore, should not happen
}
}
forwardRequestExecutor.shutdown();
try {
if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS))
forwardRequestExecutor.shutdownNow();
} catch (InterruptedException e) {
// ignore
}
log.info("Herder stopped");
}
@Override
public synchronized void connectors(final Callback<Collection<String>> callback) {
log.trace("Submitting connector listing request");
addRequest(
new Callable<Void>() {
@Override
public Void call() throws Exception {
if (checkRebalanceNeeded(callback))
return null;
callback.onCompletion(null, configState.connectors());
return null;
}
},
forwardErrorCallback(callback)
);
}
@Override
public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) {
log.trace("Submitting connector info request {}", connName);
addRequest(
new Callable<Void>() {
@Override
public Void call() throws Exception {
if (checkRebalanceNeeded(callback))
return null;
if (!configState.contains(connName)) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
} else {
callback.onCompletion(null, new ConnectorInfo(connName, configState.connectorConfig(connName), configState.tasks(connName)));
}
return null;
}
},
forwardErrorCallback(callback)
);
}
@Override
public void connectorConfig(String connName, final Callback<Map<String, String>> callback) {
// Subset of connectorInfo, so piggy back on that implementation
log.trace("Submitting connector config read request {}", connName);
connectorInfo(connName, new Callback<ConnectorInfo>() {
@Override
public void onCompletion(Throwable error, ConnectorInfo result) {
if (error != null)
callback.onCompletion(error, null);
else
callback.onCompletion(null, result.config());
}
});
}
@Override
public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
final Callback<Created<ConnectorInfo>> callback) {
log.trace("Submitting connector config write request {}", connName);
addRequest(
new Callable<Void>() {
@Override
public Void call() throws Exception {
log.trace("Handling connector config request {}", connName);
if (!isLeader()) {
callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
return null;
}
boolean exists = configState.contains(connName);
if (!allowReplace && exists) {
callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
return null;
}
if (config == null) {
if (!exists) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
} else {
log.trace("Removing connector config {} {} {}", connName, allowReplace, configState.connectors());
configBackingStore.removeConnectorConfig(connName);
callback.onCompletion(null, new Created<ConnectorInfo>(false, null));
}
return null;
}
log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors());
configBackingStore.putConnectorConfig(connName, config);
// Note that we use the updated connector config despite the fact that we don't have an updated
// snapshot yet. The existing task info should still be accurate.
ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName));
callback.onCompletion(null, new Created<>(!exists, info));
return null;
}
},
forwardErrorCallback(callback)
);
}
@Override
public synchronized void requestTaskReconfiguration(final String connName) {
log.trace("Submitting connector task reconfiguration request {}", connName);
addRequest(
new Callable<Void>() {
@Override
public Void call() throws Exception {
reconfigureConnectorTasksWithRetry(connName);
return null;
}
},
new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
if (error != null) {
log.error("Unexpected error during task reconfiguration: ", error);
log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", connName);
}
}
}
);
}
@Override
public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) {
log.trace("Submitting get task configuration request {}", connName);
addRequest(
new Callable<Void>() {
@Override
public Void call() throws Exception {
if (checkRebalanceNeeded(callback))
return null;
if (!configState.contains(connName)) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
} else {
List<TaskInfo> result = new ArrayList<>();
for (int i = 0; i < configState.taskCount(connName); i++) {
ConnectorTaskId id = new ConnectorTaskId(connName, i);
result.add(new TaskInfo(id, configState.taskConfig(id)));
}
callback.onCompletion(null, result);
}
return null;
}
},
forwardErrorCallback(callback)
);
}
@Override
public synchronized void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) {
log.trace("Submitting put task configuration request {}", connName);
addRequest(
new Callable<Void>() {
@Override
public Void call() throws Exception {
if (!isLeader())
callback.onCompletion(new NotLeaderException("Only the leader may write task configurations.", leaderUrl()), null);
else if (!configState.contains(connName))
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
else {
configBackingStore.putTaskConfigs(connName, configs);
callback.onCompletion(null, null);
}
return null;
}
},
forwardErrorCallback(callback)
);
}
@Override
public synchronized void restartConnector(final String connName, final Callback<Void> callback) {
addRequest(new Callable<Void>() {
@Override
public Void call() throws Exception {
if (checkRebalanceNeeded(callback))
return null;
if (!configState.connectors().contains(connName)) {
callback.onCompletion(new NotFoundException("Unknown connector: " + connName), null);
return null;
}
if (worker.ownsConnector(connName)) {
try {
worker.stopConnector(connName);
startConnector(connName);
callback.onCompletion(null, null);
} catch (Throwable t) {
callback.onCompletion(t, null);
}
} else if (isLeader()) {
callback.onCompletion(new NotAssignedException("Cannot restart connector since it is not assigned to this member", member.ownerUrl(connName)), null);
} else {
callback.onCompletion(new NotLeaderException("Cannot restart connector since it is not assigned to this member", leaderUrl()), null);
}
return null;
}
}, forwardErrorCallback(callback));
}
@Override
public synchronized void restartTask(final ConnectorTaskId id, final Callback<Void> callback) {
addRequest(new Callable<Void>() {
@Override
public Void call() throws Exception {
if (checkRebalanceNeeded(callback))
return null;
if (!configState.connectors().contains(id.connector())) {
callback.onCompletion(new NotFoundException("Unknown connector: " + id.connector()), null);
return null;
}
if (configState.taskConfig(id) == null) {
callback.onCompletion(new NotFoundException("Unknown task: " + id), null);
return null;
}
if (worker.ownsTask(id)) {
try {
worker.stopAndAwaitTask(id);
startTask(id);
callback.onCompletion(null, null);
} catch (Throwable t) {
callback.onCompletion(t, null);
}
} else if (isLeader()) {
callback.onCompletion(new NotAssignedException("Cannot restart task since it is not assigned to this member", member.ownerUrl(id)), null);
} else {
callback.onCompletion(new NotLeaderException("Cannot restart task since it is not assigned to this member", leaderUrl()), null);
}
return null;
}
}, forwardErrorCallback(callback));
}
@Override
public int generation() {
return generation;
}
// Should only be called from work thread, so synchronization should not be needed
private boolean isLeader() {
return assignment != null && member.memberId().equals(assignment.leader());
}
/**
* Get the URL for the leader's REST interface, or null if we do not have the leader's URL yet.
*/
private String leaderUrl() {
if (assignment == null)
return null;
return assignment.leaderUrl();
}
/**
* Handle post-assignment operations, either trying to resolve issues that kept assignment from completing, getting
* this node into sync and its work started. Since
*
* @return false if we couldn't finish
*/
private boolean handleRebalanceCompleted() {
if (this.rebalanceResolved)
return true;
// We need to handle a variety of cases after a rebalance:
// 1. Assignment failed
// 1a. We are the leader for the round. We will be leader again if we rejoin now, so we need to catch up before
// even attempting to. If we can't we should drop out of the group because we will block everyone from making
// progress. We can backoff and try rejoining later.
// 1b. We are not the leader. We might need to catch up. If we're already caught up we can rejoin immediately,
// otherwise, we just want to wait reasonable amount of time to catch up and rejoin if we are ready.
// 2. Assignment succeeded.
// 2a. We are caught up on configs. Awesome! We can proceed to run our assigned work.
// 2b. We need to try to catch up - try reading configs for reasonable amount of time.
boolean needsReadToEnd = false;
boolean needsRejoin = false;
if (assignment.failed()) {
needsRejoin = true;
if (isLeader()) {
log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying.");
needsReadToEnd = true;
} else if (configState.offset() < assignment.offset()) {
log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying.");
needsReadToEnd = true;
} else {
log.warn("Join group completed, but assignment failed. We were up to date, so just retrying.");
}
} else {
if (configState.offset() < assignment.offset()) {
log.warn("Catching up to assignment's config offset.");
needsReadToEnd = true;
}
}
if (needsReadToEnd) {
// Force exiting this method to avoid creating any connectors/tasks and require immediate rejoining if
// we timed out. This should only happen if we failed to read configuration for long enough,
// in which case giving back control to the main loop will prevent hanging around indefinitely after getting kicked out of the group.
// We also indicate to the main loop that we failed to readConfigs so it will check that the issue was resolved before trying to join the group
if (!readConfigToEnd(workerSyncTimeoutMs)) {
canReadConfigs = false;
needsRejoin = true;
}
}
if (needsRejoin) {
member.requestRejoin();
return false;
}
// Should still validate that they match since we may have gone *past* the required offset, in which case we
// should *not* start any tasks and rejoin
if (configState.offset() != assignment.offset()) {
log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", configState.offset(), assignment.offset());
member.requestRejoin();
return false;
}
startWork();
// We only mark this as resolved once we've actually started work, which allows us to correctly track whether
// what work is currently active and running. If we bail early, the main tick loop + having requested rejoin
// guarantees we'll attempt to rejoin before executing this method again.
rebalanceResolved = true;
return true;
}
/**
* Try to read to the end of the config log within the given timeout
* @param timeoutMs maximum time to wait to sync to the end of the log
* @return true if successful, false if timed out
*/
private boolean readConfigToEnd(long timeoutMs) {
log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
try {
configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
configState = configBackingStore.snapshot();
log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset());
return true;
} catch (TimeoutException e) {
// in case reading the log takes too long, leave the group to ensure a quick rebalance (although by default we should be out of the group already)
// and back off to avoid a tight loop of rejoin-attempt-to-catch-up-leave
log.warn("Didn't reach end of config log quickly enough", e);
member.maybeLeaveGroup();
backoff(workerUnsyncBackoffMs);
return false;
}
}
private void backoff(long ms) {
Utils.sleep(ms);
}
private void startWork() {
// Start assigned connectors and tasks
log.info("Starting connectors and tasks using config offset {}", assignment.offset());
for (String connectorName : assignment.connectors()) {
try {
startConnector(connectorName);
} catch (ConfigException e) {
log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +
"configuration. This connector will not execute until reconfigured.", e);
}
}
for (ConnectorTaskId taskId : assignment.tasks()) {
try {
startTask(taskId);
} catch (ConfigException e) {
log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
"configuration. This task will not execute until reconfigured.", e);
}
}
log.info("Finished starting connectors and tasks");
}
private void startTask(ConnectorTaskId taskId) {
log.info("Starting task {}", taskId);
TargetState initialState = configState.targetState(taskId.connector());
Map<String, String> configs = configState.taskConfig(taskId);
TaskConfig taskConfig = new TaskConfig(configs);
worker.startTask(taskId, taskConfig, this, initialState);
}
// Helper for starting a connector with the given name, which will extract & parse the config, generate connector
// context and add to the worker. This needs to be called from within the main worker thread for this herder.
private void startConnector(String connectorName) {
log.info("Starting connector {}", connectorName);
Map<String, String> configs = configState.connectorConfig(connectorName);
ConnectorConfig connConfig = new ConnectorConfig(configs);
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
TargetState initialState = configState.targetState(connectorName);
worker.startConnector(connConfig, ctx, this, initialState);
// Immediately request configuration since this could be a brand new connector. However, also only update those
// task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
// just restoring an existing connector.
if (initialState == TargetState.STARTED)
reconfigureConnectorTasksWithRetry(connName);
}
private void reconfigureConnectorTasksWithRetry(final String connName) {
reconfigureConnector(connName, new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
// If we encountered an error, we don't have much choice but to just retry. If we don't, we could get
// stuck with a connector that thinks it has generated tasks, but wasn't actually successful and therefore
// never makes progress. The retry has to run through a HerderRequest since this callback could be happening
// from the HTTP request forwarding thread.
if (error != null) {
log.error("Failed to reconfigure connector's tasks, retrying after backoff:", error);
addRequest(RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS,
new Callable<Void>() {
@Override
public Void call() throws Exception {
reconfigureConnectorTasksWithRetry(connName);
return null;
}
}, new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
log.error("Unexpected error during connector task reconfiguration: ", error);
log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", connName);
}
}
);
}
}
});
}
// Updates configurations for a connector by requesting them from the connector, filling in parameters provided
// by the system, then checks whether any configs have actually changed before submitting the new configs to storage
private void reconfigureConnector(final String connName, final Callback<Void> cb) {
try {
if (!worker.isRunning(connName)) {
log.info("Skipping reconfiguration of connector {} since it is not running", connName);
return;
}
Map<String, String> configs = configState.connectorConfig(connName);
ConnectorConfig connConfig = new ConnectorConfig(configs);
List<String> sinkTopics = null;
if (worker.isSinkConnector(connName))
sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
final List<Map<String, String>> taskProps
= worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
boolean changed = false;
int currentNumTasks = configState.taskCount(connName);
if (taskProps.size() != currentNumTasks) {
log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
changed = true;
} else {
int index = 0;
for (Map<String, String> taskConfig : taskProps) {
if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName, index)))) {
log.debug("Change in task configurations, writing updated task configurations");
changed = true;
break;
}
index++;
}
}
if (changed) {
if (isLeader()) {
configBackingStore.putTaskConfigs(connName, taskProps);
cb.onCompletion(null, null);
} else {
// We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector
// addition or removal. If we blocked waiting for the response from leader, we may be kicked out of the worker group.
forwardRequestExecutor.submit(new Runnable() {
@Override
public void run() {
try {
String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
RestServer.httpRequest(reconfigUrl, "POST", taskProps, null);
cb.onCompletion(null, null);
} catch (ConnectException e) {
log.error("Request to leader to reconfigure connector tasks failed", e);
cb.onCompletion(e, null);
}
}
});
}
}
} catch (Throwable t) {
cb.onCompletion(t, null);
}
}
private boolean checkRebalanceNeeded(Callback<?> callback) {
// Raise an error if we are expecting a rebalance to begin. This prevents us from forwarding requests
// based on stale leadership or assignment information
if (needsReconfigRebalance) {
callback.onCompletion(new RebalanceNeededException("Request cannot be completed because a rebalance is expected"), null);
return true;
}
return false;
}
private void addRequest(Callable<Void> action, Callback<Void> callback) {
addRequest(0, action, callback);
}
private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback);
requests.add(req);
if (requests.peek() == req)
member.wakeup();
}
public class ConfigUpdateListener implements ConfigBackingStore.UpdateListener {
@Override
public void onConnectorConfigRemove(String connector) {
log.info("Connector {} config removed", connector);
synchronized (DistributedHerder.this) {
// rebalance after connector removal to ensure that existing tasks are balanced among workers
if (configState.contains(connector))
needsReconfigRebalance = true;
connectorConfigUpdates.add(connector);
}
member.wakeup();
}
@Override
public void onConnectorConfigUpdate(String connector) {
log.info("Connector {} config updated", connector);
// Stage the update and wake up the work thread. Connector config *changes* only need the one connector
// to be bounced. However, this callback may also indicate a connector *addition*, which does require
// a rebalance, so we need to be careful about what operation we request.
synchronized (DistributedHerder.this) {
if (!configState.contains(connector))
needsReconfigRebalance = true;
connectorConfigUpdates.add(connector);
}
member.wakeup();
}
@Override
public void onTaskConfigUpdate(Collection<ConnectorTaskId> tasks) {
log.info("Tasks {} configs updated", tasks);
// Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs
// always need a rebalance to ensure offsets get committed.
// TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task
// connectors clearly don't need any coordination.
synchronized (DistributedHerder.this) {
needsReconfigRebalance = true;
}
member.wakeup();
}
@Override
public void onConnectorTargetStateChange(String connector) {
log.info("Connector {} target state change", connector);
synchronized (DistributedHerder.this) {
connectorTargetStateChanges.add(connector);
}
member.wakeup();
}
}
private class HerderRequest implements Comparable<HerderRequest> {
private final long at;
private final Callable<Void> action;
private final Callback<Void> callback;
public HerderRequest(long at, Callable<Void> action, Callback<Void> callback) {
this.at = at;
this.action = action;
this.callback = callback;
}
public Callable<Void> action() {
return action;
}
public Callback<Void> callback() {
return callback;
}
@Override
public int compareTo(HerderRequest o) {
return Long.compare(at, o.at);
}
}
private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) {
return new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
if (error != null)
callback.onCompletion(error, null);
}
};
}
private void updateDeletedConnectorStatus() {
ClusterConfigState snapshot = configBackingStore.snapshot();
Set<String> connectors = snapshot.connectors();
for (String connector : statusBackingStore.connectors()) {
if (!connectors.contains(connector)) {
log.debug("Cleaning status information for connector {}", connector);
onDeletion(connector);
}
}
}
// Rebalances are triggered internally from the group member, so these are always executed in the work thread.
public class RebalanceListener implements WorkerRebalanceListener {
@Override
public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
// This callback just logs the info and saves it. The actual response is handled in the main loop, which
// ensures the group member's logic for rebalancing can complete, potentially long-running steps to
// catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
// group membership actions (e.g., we may need to explicitly leave the group if we cannot handle the
// assigned tasks).
log.info("Joined group and got assignment: {}", assignment);
synchronized (DistributedHerder.this) {
DistributedHerder.this.assignment = assignment;
DistributedHerder.this.generation = generation;
rebalanceResolved = false;
}
// Delete the statuses of all connectors removed prior to the start of this reblaance. This has to
// be done after the rebalance completes to avoid race conditions as the previous generation attempts
// to change the state to UNASSIGNED after tasks have been stopped.
if (isLeader())
updateDeletedConnectorStatus();
// We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
// sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the
// main thread.
member.wakeup();
}
@Override
public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
log.info("Rebalance started");
// Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance,
// it is still important to have a leader that can write configs, offsets, etc.
if (rebalanceResolved) {
// TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of
// them to finish
// TODO: Technically we don't have to stop connectors at all until we know they've really been removed from
// this worker. Instead, we can let them continue to run but buffer any update requests (which should be
// rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
// unnecessary repeated connections to the source/sink system.
for (String connectorName : connectors)
worker.stopConnector(connectorName);
// TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
// stopping them then state could continue to be reused when the task remains on this worker. For example,
// this would avoid having to close a connection and then reopen it when the task is assigned back to this
// worker again.
if (!tasks.isEmpty()) {
worker.stopTasks(tasks); // trigger stop() for all tasks
worker.awaitStopTasks(tasks); // await stopping tasks
}
// Ensure that all status updates have been pushed to the storage system before rebalancing.
// Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
// completes.
statusBackingStore.flush();
log.info("Finished stopping tasks in preparation for rebalance");
} else {
log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks");
}
}
}
}