| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| **/ |
| |
| package org.apache.kafka.connect.runtime.standalone; |
| |
| import org.apache.kafka.connect.errors.AlreadyExistsException; |
| import org.apache.kafka.connect.errors.ConnectException; |
| import org.apache.kafka.connect.errors.NotFoundException; |
| import org.apache.kafka.connect.runtime.AbstractHerder; |
| import org.apache.kafka.connect.runtime.ConnectorConfig; |
| import org.apache.kafka.connect.runtime.HerderConnectorContext; |
| import org.apache.kafka.connect.runtime.TaskConfig; |
| import org.apache.kafka.connect.runtime.Worker; |
| import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; |
| import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; |
| import org.apache.kafka.connect.storage.MemoryStatusBackingStore; |
| import org.apache.kafka.connect.storage.StatusBackingStore; |
| import org.apache.kafka.connect.util.Callback; |
| import org.apache.kafka.connect.util.ConnectorTaskId; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| |
| /** |
| * Single process, in-memory "herder". Useful for a standalone Kafka Connect process. |
| */ |
| public class StandaloneHerder extends AbstractHerder { |
| private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class); |
| |
| private HashMap<String, ConnectorState> connectors = new HashMap<>(); |
| |
| public StandaloneHerder(Worker worker) { |
| this(worker.workerId(), worker, new MemoryStatusBackingStore()); |
| } |
| |
| // visible for testing |
| StandaloneHerder(String workerId, |
| Worker worker, |
| StatusBackingStore statusBackingStore) { |
| super(worker, statusBackingStore, workerId); |
| } |
| |
| public synchronized void start() { |
| log.info("Herder starting"); |
| startServices(); |
| log.info("Herder started"); |
| } |
| |
| public synchronized void stop() { |
| log.info("Herder stopping"); |
| |
| // There's no coordination/hand-off to do here since this is all standalone. Instead, we |
| // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all |
| // the tasks. |
| for (String connName : new HashSet<>(connectors.keySet())) { |
| removeConnectorTasks(connName); |
| try { |
| worker.stopConnector(connName); |
| } catch (ConnectException e) { |
| log.error("Error shutting down connector {}: ", connName, e); |
| } |
| } |
| connectors.clear(); |
| |
| log.info("Herder stopped"); |
| } |
| |
| @Override |
| public int generation() { |
| return 0; |
| } |
| |
| @Override |
| public synchronized void connectors(Callback<Collection<String>> callback) { |
| callback.onCompletion(null, new ArrayList<>(connectors.keySet())); |
| } |
| |
| @Override |
| public synchronized void connectorInfo(String connName, Callback<ConnectorInfo> callback) { |
| ConnectorState state = connectors.get(connName); |
| if (state == null) { |
| callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); |
| return; |
| } |
| callback.onCompletion(null, createConnectorInfo(state)); |
| } |
| |
| private ConnectorInfo createConnectorInfo(ConnectorState state) { |
| if (state == null) |
| return null; |
| |
| List<ConnectorTaskId> taskIds = new ArrayList<>(); |
| for (int i = 0; i < state.taskConfigs.size(); i++) |
| taskIds.add(new ConnectorTaskId(state.name, i)); |
| return new ConnectorInfo(state.name, state.configOriginals, taskIds); |
| } |
| |
| @Override |
| public void connectorConfig(String connName, final Callback<Map<String, String>> callback) { |
| // Subset of connectorInfo, so piggy back on that implementation |
| connectorInfo(connName, new Callback<ConnectorInfo>() { |
| @Override |
| public void onCompletion(Throwable error, ConnectorInfo result) { |
| if (error != null) { |
| callback.onCompletion(error, null); |
| return; |
| } |
| callback.onCompletion(null, result.config()); |
| } |
| }); |
| } |
| |
| @Override |
| public synchronized void putConnectorConfig(String connName, final Map<String, String> config, |
| boolean allowReplace, |
| final Callback<Created<ConnectorInfo>> callback) { |
| try { |
| boolean created = false; |
| if (connectors.containsKey(connName)) { |
| if (!allowReplace) { |
| callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null); |
| return; |
| } |
| if (config == null) // Deletion, kill tasks as well |
| removeConnectorTasks(connName); |
| worker.stopConnector(connName); |
| if (config == null) { |
| connectors.remove(connName); |
| onDeletion(connName); |
| } |
| } else { |
| if (config == null) { |
| // Deletion, must already exist |
| callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); |
| return; |
| } |
| created = true; |
| } |
| if (config != null) { |
| startConnector(config); |
| updateConnectorTasks(connName); |
| } |
| if (config != null) |
| callback.onCompletion(null, new Created<>(created, createConnectorInfo(connectors.get(connName)))); |
| else |
| callback.onCompletion(null, new Created<ConnectorInfo>(false, null)); |
| } catch (ConnectException e) { |
| callback.onCompletion(e, null); |
| } |
| |
| } |
| |
| @Override |
| public synchronized void requestTaskReconfiguration(String connName) { |
| if (!worker.connectorNames().contains(connName)) { |
| log.error("Task that requested reconfiguration does not exist: {}", connName); |
| return; |
| } |
| updateConnectorTasks(connName); |
| } |
| |
| @Override |
| public synchronized void taskConfigs(String connName, Callback<List<TaskInfo>> callback) { |
| ConnectorState state = connectors.get(connName); |
| if (state == null) { |
| callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); |
| return; |
| } |
| |
| List<TaskInfo> result = new ArrayList<>(); |
| for (int i = 0; i < state.taskConfigs.size(); i++) { |
| TaskInfo info = new TaskInfo(new ConnectorTaskId(connName, i), state.taskConfigs.get(i)); |
| result.add(info); |
| } |
| callback.onCompletion(null, result); |
| } |
| |
| @Override |
| public void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback) { |
| throw new UnsupportedOperationException("Kafka Connect in standalone mode does not support externally setting task configurations."); |
| } |
| |
| /** |
| * Start a connector in the worker and record its state. |
| * @param connectorProps new connector configuration |
| * @return the connector name |
| */ |
| private String startConnector(Map<String, String> connectorProps) { |
| ConnectorConfig connConfig = new ConnectorConfig(connectorProps); |
| String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); |
| ConnectorState state = connectors.get(connName); |
| worker.startConnector(connConfig, new HerderConnectorContext(this, connName), this); |
| if (state == null) { |
| connectors.put(connName, new ConnectorState(connectorProps, connConfig)); |
| } else { |
| state.configOriginals = connectorProps; |
| state.config = connConfig; |
| } |
| return connName; |
| } |
| |
| |
| private List<Map<String, String>> recomputeTaskConfigs(String connName) { |
| ConnectorState state = connectors.get(connName); |
| return worker.connectorTaskConfigs(connName, |
| state.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG), |
| state.config.getList(ConnectorConfig.TOPICS_CONFIG)); |
| } |
| |
| private void createConnectorTasks(String connName) { |
| ConnectorState state = connectors.get(connName); |
| int index = 0; |
| for (Map<String, String> taskConfigMap : state.taskConfigs) { |
| ConnectorTaskId taskId = new ConnectorTaskId(connName, index); |
| TaskConfig config = new TaskConfig(taskConfigMap); |
| try { |
| worker.startTask(taskId, config, this); |
| } catch (Throwable e) { |
| log.error("Failed to add task {}: ", taskId, e); |
| // Swallow this so we can continue updating the rest of the tasks |
| // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task |
| // that died after starting successfully. |
| } |
| index++; |
| } |
| } |
| |
| private Set<ConnectorTaskId> tasksFor(ConnectorState state) { |
| Set<ConnectorTaskId> tasks = new HashSet<>(); |
| for (int i = 0; i < state.taskConfigs.size(); i++) |
| tasks.add(new ConnectorTaskId(state.name, i)); |
| return tasks; |
| } |
| |
| private void removeConnectorTasks(String connName) { |
| ConnectorState state = connectors.get(connName); |
| Set<ConnectorTaskId> tasks = tasksFor(state); |
| if (!tasks.isEmpty()) { |
| worker.stopTasks(tasks); |
| worker.awaitStopTasks(tasks); |
| state.taskConfigs = new ArrayList<>(); |
| } |
| } |
| |
| private void updateConnectorTasks(String connName) { |
| List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName); |
| ConnectorState state = connectors.get(connName); |
| if (!newTaskConfigs.equals(state.taskConfigs)) { |
| removeConnectorTasks(connName); |
| state.taskConfigs = newTaskConfigs; |
| createConnectorTasks(connName); |
| } |
| } |
| |
| |
| private static class ConnectorState { |
| public String name; |
| public Map<String, String> configOriginals; |
| public ConnectorConfig config; |
| List<Map<String, String>> taskConfigs; |
| |
| public ConnectorState(Map<String, String> configOriginals, ConnectorConfig config) { |
| this.name = config.getString(ConnectorConfig.NAME_CONFIG); |
| this.configOriginals = configOriginals; |
| this.config = config; |
| this.taskConfigs = new ArrayList<>(); |
| } |
| } |
| } |