| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| **/ |
| |
| package org.apache.kafka.connect.runtime; |
| |
| import org.apache.kafka.clients.producer.KafkaProducer; |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| import org.apache.kafka.common.KafkaException; |
| import org.apache.kafka.common.utils.Time; |
| import org.apache.kafka.common.utils.Utils; |
| import org.apache.kafka.connect.connector.Connector; |
| import org.apache.kafka.connect.connector.ConnectorContext; |
| import org.apache.kafka.connect.connector.Task; |
| import org.apache.kafka.connect.errors.ConnectException; |
| import org.apache.kafka.connect.sink.SinkTask; |
| import org.apache.kafka.connect.source.SourceTask; |
| import org.apache.kafka.connect.storage.Converter; |
| import org.apache.kafka.connect.storage.OffsetBackingStore; |
| import org.apache.kafka.connect.storage.OffsetStorageReader; |
| import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; |
| import org.apache.kafka.connect.storage.OffsetStorageWriter; |
| import org.apache.kafka.connect.util.ConnectorTaskId; |
| import org.reflections.Reflections; |
| import org.reflections.util.ClasspathHelper; |
| import org.reflections.util.ConfigurationBuilder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| |
| |
| /** |
| * <p> |
| * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving |
| * data to/from Kafka. |
| * </p> |
| * <p> |
| * Since each task has a dedicated thread, this is mainly just a container for them. |
| * </p> |
| */ |
| public class Worker { |
| private static final Logger log = LoggerFactory.getLogger(Worker.class); |
| |
| private final ExecutorService executor; |
| private final Time time; |
| private final String workerId; |
| private final WorkerConfig config; |
| private final Converter keyConverter; |
| private final Converter valueConverter; |
| private final Converter internalKeyConverter; |
| private final Converter internalValueConverter; |
| private final OffsetBackingStore offsetBackingStore; |
| |
| private HashMap<String, WorkerConnector> connectors = new HashMap<>(); |
| private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>(); |
| private KafkaProducer<byte[], byte[]> producer; |
| private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; |
| |
| public Worker(String workerId, |
| Time time, |
| WorkerConfig config, |
| OffsetBackingStore offsetBackingStore) { |
| this.executor = Executors.newCachedThreadPool(); |
| this.workerId = workerId; |
| this.time = time; |
| this.config = config; |
| this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); |
| this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true); |
| this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); |
| this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false); |
| this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class); |
| this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true); |
| this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class); |
| this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false); |
| |
| this.offsetBackingStore = offsetBackingStore; |
| this.offsetBackingStore.configure(config); |
| } |
| |
| public void start() { |
| log.info("Worker starting"); |
| |
| Map<String, Object> producerProps = new HashMap<>(); |
| producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); |
| producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); |
| producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); |
| |
| // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the |
| // worker, but this may compromise the delivery guarantees of Kafka Connect. |
| producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString()); |
| producerProps.put(ProducerConfig.RETRIES_CONFIG, ((Integer) Integer.MAX_VALUE).toString()); |
| producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString()); |
| producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); |
| producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); |
| |
| producerProps.putAll(config.originalsWithPrefix("producer.")); |
| |
| producer = new KafkaProducer<>(producerProps); |
| |
| offsetBackingStore.start(); |
| sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config); |
| |
| log.info("Worker started"); |
| } |
| |
| public void stop() { |
| log.info("Worker stopping"); |
| |
| long started = time.milliseconds(); |
| long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); |
| |
| for (Map.Entry<String, WorkerConnector> entry : connectors.entrySet()) { |
| WorkerConnector workerConnector = entry.getValue(); |
| log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" + |
| "Worker is stopped.", entry.getKey()); |
| workerConnector.shutdown(); |
| } |
| |
| Collection<ConnectorTaskId> taskIds = tasks.keySet(); |
| log.warn("Shutting down tasks {} uncleanly; herder should have shut down " |
| + "tasks before the Worker is stopped.", taskIds); |
| stopTasks(taskIds); |
| awaitStopTasks(taskIds); |
| |
| long timeoutMs = limit - time.milliseconds(); |
| sourceTaskOffsetCommitter.close(timeoutMs); |
| |
| offsetBackingStore.stop(); |
| |
| log.info("Worker stopped"); |
| } |
| |
| /** |
| * Add a new connector. |
| * @param connConfig connector configuration |
| * @param ctx context for the connector |
| * @param statusListener listener for notifications of connector status changes |
| * @param initialState the initial target state that the connector should be initialized to |
| */ |
| public void startConnector(ConnectorConfig connConfig, |
| ConnectorContext ctx, |
| ConnectorStatus.Listener statusListener, |
| TargetState initialState) { |
| String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); |
| Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); |
| |
| log.info("Creating connector {} of type {}", connName, connClass.getName()); |
| |
| if (connectors.containsKey(connName)) |
| throw new ConnectException("Connector with name " + connName + " already exists"); |
| |
| final Connector connector = instantiateConnector(connClass); |
| WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, statusListener); |
| |
| log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName()); |
| workerConnector.initialize(connConfig); |
| workerConnector.transitionTo(initialState); |
| |
| connectors.put(connName, workerConnector); |
| log.info("Finished creating connector {}", connName); |
| } |
| |
| /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */ |
| public boolean isSinkConnector(String connName) { |
| WorkerConnector workerConnector = connectors.get(connName); |
| return workerConnector.isSinkConnector(); |
| } |
| |
| public Connector getConnector(String connType) { |
| Class<? extends Connector> connectorClass = getConnectorClass(connType); |
| return instantiateConnector(connectorClass); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Class<? extends Connector> getConnectorClass(String connectorAlias) { |
| // Avoid the classpath scan if the full class name was provided |
| try { |
| Class<?> clazz = Class.forName(connectorAlias); |
| if (!Connector.class.isAssignableFrom(clazz)) |
| throw new ConnectException("Class " + connectorAlias + " does not implement Connector"); |
| return (Class<? extends Connector>) clazz; |
| } catch (ClassNotFoundException e) { |
| // Fall through to scan for the alias |
| } |
| |
| // Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration |
| Reflections reflections = new Reflections(new ConfigurationBuilder() |
| .setUrls(ClasspathHelper.forJavaClassPath())); |
| |
| Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class); |
| |
| List<Class<? extends Connector>> results = new ArrayList<>(); |
| |
| for (Class<? extends Connector> connector: connectors) { |
| // Configuration included the class name but not package |
| if (connector.getSimpleName().equals(connectorAlias)) |
| results.add(connector); |
| |
| // Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector) |
| if (connector.getSimpleName().equals(connectorAlias + "Connector")) |
| results.add(connector); |
| } |
| |
| if (results.isEmpty()) |
| throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorAlias + " available connectors are: " + connectorNames(connectors)); |
| if (results.size() > 1) { |
| throw new ConnectException("More than one connector matches alias " + connectorAlias + ". Please use full package + class name instead. Classes found: " + connectorNames(results)); |
| } |
| |
| // We just validated that we have exactly one result, so this is safe |
| return results.get(0); |
| } |
| |
| private String connectorNames(Collection<Class<? extends Connector>> connectors) { |
| StringBuilder names = new StringBuilder(); |
| for (Class<?> c : connectors) |
| names.append(c.getName()).append(", "); |
| |
| return names.substring(0, names.toString().length() - 2); |
| } |
| |
| public boolean ownsTask(ConnectorTaskId taskId) { |
| return tasks.containsKey(taskId); |
| } |
| |
| private static Connector instantiateConnector(Class<? extends Connector> connClass) { |
| try { |
| return Utils.newInstance(connClass); |
| } catch (Throwable t) { |
| // Catches normal exceptions due to instantiation errors as well as any runtime errors that |
| // may be caused by user code |
| throw new ConnectException("Failed to create connector instance", t); |
| } |
| } |
| |
| public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) { |
| log.trace("Reconfiguring connector tasks for {}", connName); |
| |
| WorkerConnector workerConnector = connectors.get(connName); |
| if (workerConnector == null) |
| throw new ConnectException("Connector " + connName + " not found in this worker."); |
| |
| Connector connector = workerConnector.connector(); |
| List<Map<String, String>> result = new ArrayList<>(); |
| String taskClassName = connector.taskClass().getName(); |
| for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) { |
| Map<String, String> taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config |
| taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); |
| if (sinkTopics != null) |
| taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ",")); |
| result.add(taskConfig); |
| } |
| return result; |
| } |
| |
| public void stopConnector(String connName) { |
| log.info("Stopping connector {}", connName); |
| |
| WorkerConnector connector = connectors.get(connName); |
| if (connector == null) |
| throw new ConnectException("Connector " + connName + " not found in this worker."); |
| |
| connector.shutdown(); |
| connectors.remove(connName); |
| |
| log.info("Stopped connector {}", connName); |
| } |
| |
| /** |
| * Get the IDs of the connectors currently running in this worker. |
| */ |
| public Set<String> connectorNames() { |
| return connectors.keySet(); |
| } |
| |
| public boolean isRunning(String connName) { |
| WorkerConnector connector = connectors.get(connName); |
| if (connector == null) |
| throw new ConnectException("Connector " + connName + " not found in this worker."); |
| return connector.isRunning(); |
| } |
| |
| /** |
| * Add a new task. |
| * @param id Globally unique ID for this task. |
| * @param taskConfig the parsed task configuration |
| * @param statusListener listener for notifications of task status changes |
| * @param initialState the initial target state that the task should be initialized to |
| */ |
| public void startTask(ConnectorTaskId id, |
| TaskConfig taskConfig, |
| TaskStatus.Listener statusListener, |
| TargetState initialState) { |
| log.info("Creating task {}", id); |
| |
| if (tasks.containsKey(id)) { |
| String msg = "Task already exists in this worker; the herder should not have requested " |
| + "that this : " + id; |
| log.error(msg); |
| throw new ConnectException(msg); |
| } |
| |
| Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); |
| final Task task = instantiateTask(taskClass); |
| log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); |
| |
| final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState); |
| |
| // Start the task before adding modifying any state, any exceptions are caught higher up the |
| // call chain and there's no cleanup to do here |
| workerTask.initialize(taskConfig); |
| executor.submit(workerTask); |
| |
| if (task instanceof SourceTask) { |
| WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask; |
| sourceTaskOffsetCommitter.schedule(id, workerSourceTask); |
| } |
| tasks.put(id, workerTask); |
| } |
| |
| private WorkerTask buildWorkerTask(ConnectorTaskId id, |
| Task task, |
| TaskStatus.Listener statusListener, |
| TargetState initialState) { |
| // Decide which type of worker task we need based on the type of task. |
| if (task instanceof SourceTask) { |
| OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), |
| internalKeyConverter, internalValueConverter); |
| OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), |
| internalKeyConverter, internalValueConverter); |
| return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, |
| valueConverter, producer, offsetReader, offsetWriter, config, time); |
| } else if (task instanceof SinkTask) { |
| return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, keyConverter, |
| valueConverter, time); |
| } else { |
| log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); |
| throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask"); |
| } |
| } |
| |
| private static Task instantiateTask(Class<? extends Task> taskClass) { |
| try { |
| return Utils.newInstance(taskClass); |
| } catch (KafkaException e) { |
| throw new ConnectException("Task class not found", e); |
| } |
| } |
| |
| public void stopTasks(Collection<ConnectorTaskId> ids) { |
| for (ConnectorTaskId id : ids) |
| stopTask(getTask(id)); |
| } |
| |
| public void awaitStopTasks(Collection<ConnectorTaskId> ids) { |
| long now = time.milliseconds(); |
| long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); |
| for (ConnectorTaskId id : ids) { |
| long remaining = Math.max(0, deadline - time.milliseconds()); |
| awaitStopTask(getTask(id), remaining); |
| } |
| } |
| |
| private void awaitStopTask(WorkerTask task, long timeout) { |
| if (!task.awaitStop(timeout)) { |
| log.error("Graceful stop of task {} failed.", task.id()); |
| task.cancel(); |
| } |
| tasks.remove(task.id()); |
| } |
| |
| private void stopTask(WorkerTask task) { |
| log.info("Stopping task {}", task.id()); |
| if (task instanceof WorkerSourceTask) |
| sourceTaskOffsetCommitter.remove(task.id()); |
| task.stop(); |
| } |
| |
| public void stopAndAwaitTask(ConnectorTaskId id) { |
| WorkerTask task = getTask(id); |
| stopTask(task); |
| awaitStopTask(task, config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)); |
| } |
| |
| /** |
| * Get the IDs of the tasks currently running in this worker. |
| */ |
| public Set<ConnectorTaskId> taskIds() { |
| return tasks.keySet(); |
| } |
| |
| private WorkerTask getTask(ConnectorTaskId id) { |
| WorkerTask task = tasks.get(id); |
| if (task == null) { |
| log.error("Task not found: " + id); |
| throw new ConnectException("Task not found: " + id); |
| } |
| return task; |
| } |
| |
| public Converter getInternalKeyConverter() { |
| return internalKeyConverter; |
| } |
| |
| public Converter getInternalValueConverter() { |
| return internalValueConverter; |
| } |
| |
| public String workerId() { |
| return workerId; |
| } |
| |
| public boolean ownsConnector(String connName) { |
| return this.connectors.containsKey(connName); |
| } |
| |
| public void setTargetState(String connName, TargetState state) { |
| log.info("Setting connector {} state to {}", connName, state); |
| |
| WorkerConnector connector = connectors.get(connName); |
| if (connector != null) |
| connector.transitionTo(state); |
| |
| for (Map.Entry<ConnectorTaskId, WorkerTask> taskEntry : tasks.entrySet()) { |
| if (taskEntry.getKey().connector().equals(connName)) |
| taskEntry.getValue().transitionTo(state); |
| } |
| } |
| |
| } |