| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| **/ |
| package org.apache.kafka.connect.runtime; |
| |
| import org.apache.kafka.common.config.Config; |
| import org.apache.kafka.common.config.ConfigDef; |
| import org.apache.kafka.common.config.ConfigDef.ConfigKey; |
| import org.apache.kafka.common.config.ConfigValue; |
| import org.apache.kafka.connect.connector.Connector; |
| import org.apache.kafka.connect.errors.NotFoundException; |
| import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; |
| import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; |
| import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; |
| import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo; |
| import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; |
| import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; |
| import org.apache.kafka.connect.storage.StatusBackingStore; |
| import org.apache.kafka.connect.tools.VerifiableSinkConnector; |
| import org.apache.kafka.connect.tools.VerifiableSourceConnector; |
| import org.apache.kafka.connect.util.ConnectorTaskId; |
| import org.reflections.Reflections; |
| import org.reflections.util.ClasspathHelper; |
| import org.reflections.util.ConfigurationBuilder; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.PrintStream; |
| import java.io.UnsupportedEncodingException; |
| import java.lang.reflect.Modifier; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| /** |
| * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions |
| * must invoke the lifecycle hooks appropriately. |
| * |
| * This class takes the following approach for sending status updates to the backing store: |
| * |
| * 1) When the connector or task is starting, we overwrite the previous state blindly. This ensures that |
| * every rebalance will reset the state of tasks to the proper state. The intuition is that there should |
| * be less chance of write conflicts when the worker has just received its assignment and is starting tasks. |
| * In particular, this prevents us from depending on the generation absolutely. If the group disappears |
| * and the generation is reset, then we'll overwrite the status information with the older (and larger) |
| * generation with the updated one. The danger of this approach is that slow starting tasks may cause the |
| * status to be overwritten after a rebalance has completed. |
| * |
| * 2) If the connector or task fails or is shutdown, we use {@link StatusBackingStore#putSafe(ConnectorStatus)}, |
| * which provides a little more protection if the worker is no longer in the group (in which case the |
| * task may have already been started on another worker). Obviously this is still racy. If the task has just |
| * started on another worker, we may not have the updated status cached yet. In this case, we'll overwrite |
| * the value which will cause the state to be inconsistent (most likely until the next rebalance). Until |
| * we have proper producer groups with fenced groups, there is not much else we can do. |
| */ |
| public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener { |
| |
| protected final Worker worker; |
| protected final StatusBackingStore statusBackingStore; |
| private final String workerId; |
| |
| private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>(); |
| private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class); |
| private static List<ConnectorPluginInfo> validConnectorPlugins; |
| |
| public AbstractHerder(Worker worker, StatusBackingStore statusBackingStore, String workerId) { |
| this.worker = worker; |
| this.statusBackingStore = statusBackingStore; |
| this.workerId = workerId; |
| } |
| |
| protected abstract int generation(); |
| |
| protected void startServices() { |
| this.statusBackingStore.start(); |
| } |
| |
| protected void stopServices() { |
| this.statusBackingStore.stop(); |
| } |
| |
| @Override |
| public void onStartup(String connector) { |
| statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RUNNING, |
| workerId, generation())); |
| } |
| |
| @Override |
| public void onShutdown(String connector) { |
| statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.UNASSIGNED, |
| workerId, generation())); |
| } |
| |
| @Override |
| public void onFailure(String connector, Throwable cause) { |
| statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.FAILED, |
| trace(cause), workerId, generation())); |
| } |
| |
| @Override |
| public void onStartup(ConnectorTaskId id) { |
| statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation())); |
| } |
| |
| @Override |
| public void onFailure(ConnectorTaskId id, Throwable cause) { |
| statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause))); |
| } |
| |
| @Override |
| public void onShutdown(ConnectorTaskId id) { |
| statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation())); |
| } |
| |
| @Override |
| public void onDeletion(String connector) { |
| for (TaskStatus status : statusBackingStore.getAll(connector)) |
| statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation())); |
| statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation())); |
| } |
| |
| @Override |
| public ConnectorStateInfo connectorStatus(String connName) { |
| ConnectorStatus connector = statusBackingStore.get(connName); |
| if (connector == null) |
| throw new NotFoundException("No status found for connector " + connName); |
| |
| Collection<TaskStatus> tasks = statusBackingStore.getAll(connName); |
| |
| ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState( |
| connector.state().toString(), connector.workerId(), connector.trace()); |
| List<ConnectorStateInfo.TaskState> taskStates = new ArrayList<>(); |
| |
| for (TaskStatus status : tasks) { |
| taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(), |
| status.state().toString(), status.workerId(), status.trace())); |
| } |
| |
| Collections.sort(taskStates); |
| |
| return new ConnectorStateInfo(connName, connectorState, taskStates); |
| } |
| |
| @Override |
| public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) { |
| TaskStatus status = statusBackingStore.get(id); |
| |
| if (status == null) |
| throw new NotFoundException("No status found for task " + id); |
| |
| return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(), |
| status.workerId(), status.trace()); |
| } |
| |
| |
| @Override |
| public ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig) { |
| ConfigDef connectorConfigDef = ConnectorConfig.configDef(); |
| List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig); |
| ConfigInfos result = generateResult(connType, connectorConfigDef.configKeys(), connectorConfigValues, Collections.<String>emptyList()); |
| |
| if (result.errorCount() != 0) { |
| return result; |
| } |
| |
| Connector connector = getConnector(connType); |
| |
| Config config = connector.validate(connectorConfig); |
| ConfigDef configDef = connector.config(); |
| Map<String, ConfigKey> configKeys = configDef.configKeys(); |
| List<ConfigValue> configValues = config.configValues(); |
| |
| Map<String, ConfigKey> resultConfigKeys = new HashMap<>(configKeys); |
| resultConfigKeys.putAll(connectorConfigDef.configKeys()); |
| configValues.addAll(connectorConfigValues); |
| |
| List<String> allGroups = new LinkedList<>(connectorConfigDef.groups()); |
| List<String> groups = configDef.groups(); |
| allGroups.addAll(groups); |
| |
| return generateResult(connType, resultConfigKeys, configValues, allGroups); |
| } |
| |
| public static List<ConnectorPluginInfo> connectorPlugins() { |
| if (validConnectorPlugins != null) { |
| return validConnectorPlugins; |
| } |
| |
| Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath())); |
| Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class); |
| connectorClasses.removeAll(SKIPPED_CONNECTORS); |
| List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>(); |
| for (Class<? extends Connector> connectorClass: connectorClasses) { |
| int mod = connectorClass.getModifiers(); |
| if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) { |
| connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName())); |
| } |
| } |
| validConnectorPlugins = connectorPlugins; |
| return connectorPlugins; |
| } |
| |
| // public for testing |
| public static ConfigInfos generateResult(String connType, Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) { |
| int errorCount = 0; |
| List<ConfigInfo> configInfoList = new LinkedList<>(); |
| |
| Map<String, ConfigValue> configValueMap = new HashMap<>(); |
| for (ConfigValue configValue: configValues) { |
| String configName = configValue.name(); |
| configValueMap.put(configName, configValue); |
| if (!configKeys.containsKey(configName)) { |
| configValue.addErrorMessage("Configuration is not defined: " + configName); |
| configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue))); |
| } |
| } |
| |
| for (String configName: configKeys.keySet()) { |
| ConfigKeyInfo configKeyInfo = convertConfigKey(configKeys.get(configName)); |
| ConfigValueInfo configValueInfo = null; |
| if (configValueMap.containsKey(configName)) { |
| ConfigValue configValue = configValueMap.get(configName); |
| configValueInfo = convertConfigValue(configValue); |
| errorCount += configValue.errorMessages().size(); |
| } |
| configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo)); |
| } |
| return new ConfigInfos(connType, errorCount, groups, configInfoList); |
| } |
| |
| private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) { |
| String name = configKey.name; |
| String type = configKey.type.name(); |
| Object defaultValue = configKey.defaultValue; |
| boolean required = false; |
| if (defaultValue == ConfigDef.NO_DEFAULT_VALUE) { |
| required = true; |
| } |
| String importance = configKey.importance.name(); |
| String documentation = configKey.documentation; |
| String group = configKey.group; |
| int orderInGroup = configKey.orderInGroup; |
| String width = configKey.width.name(); |
| String displayName = configKey.displayName; |
| List<String> dependents = configKey.dependents; |
| return new ConfigKeyInfo(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents); |
| } |
| |
| private static ConfigValueInfo convertConfigValue(ConfigValue configValue) { |
| return new ConfigValueInfo(configValue.name(), configValue.value(), configValue.recommendedValues(), configValue.errorMessages(), configValue.visible()); |
| } |
| |
| private Connector getConnector(String connType) { |
| if (tempConnectors.containsKey(connType)) { |
| return tempConnectors.get(connType); |
| } else { |
| Connector connector = worker.getConnector(connType); |
| tempConnectors.put(connType, connector); |
| return connector; |
| } |
| } |
| |
| private String trace(Throwable t) { |
| ByteArrayOutputStream output = new ByteArrayOutputStream(); |
| t.printStackTrace(new PrintStream(output)); |
| try { |
| return output.toString("UTF-8"); |
| } catch (UnsupportedEncodingException e) { |
| return null; |
| } |
| } |
| } |