blob: f1993a7e7f157f426d09d4e9b102d20017d576d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.provider.DirectoryConfigProvider;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class AbstractHerderTest {
private static final String CONN1 = "sourceA";
private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
private static final Integer MAX_TASKS = 3;
private static final Map<String, String> CONN1_CONFIG = new HashMap<>();
private static final String TEST_KEY = "testKey";
private static final String TEST_KEY2 = "testKey2";
private static final String TEST_KEY3 = "testKey3";
private static final String TEST_VAL = "testVal";
private static final String TEST_VAL2 = "testVal2";
private static final String TEST_REF = "${file:/tmp/somefile.txt:somevar}";
private static final String TEST_REF2 = "${file:/tmp/somefile2.txt:somevar2}";
private static final String TEST_REF3 = "${file:/tmp/somefile3.txt:somevar3}";
static {
CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
CONN1_CONFIG.put(TEST_KEY, TEST_REF);
CONN1_CONFIG.put(TEST_KEY2, TEST_REF2);
CONN1_CONFIG.put(TEST_KEY3, TEST_REF3);
}
private static final Map<String, String> TASK_CONFIG = new HashMap<>();
static {
TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, SampleSourceConnector.SampleSourceTask.class.getName());
TASK_CONFIG.put(TEST_KEY, TEST_REF);
}
private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>();
static {
TASK_CONFIGS.add(TASK_CONFIG);
TASK_CONFIGS.add(TASK_CONFIG);
TASK_CONFIGS.add(TASK_CONFIG);
}
private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP = new HashMap<>();
static {
TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
}
private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(
1,
null,
Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());
private static final ClusterConfigState SNAPSHOT_NO_TASKS = new ClusterConfigState(
1,
null,
Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());
private final String workerId = "workerId";
private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
private final int generation = 5;
private final String connectorName = "connector";
private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
@Mock private Worker worker;
@Mock private WorkerConfigTransformer transformer;
@Mock private ConfigBackingStore configStore;
@Mock private StatusBackingStore statusStore;
@Mock private ClassLoader classLoader;
@Mock private LoaderSwap loaderSwap;
@Mock private Plugins plugins;
@Test
public void testConnectors() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(configStore.snapshot()).thenReturn(SNAPSHOT);
assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
}
@Test
public void testConnectorClientConfigOverridePolicyClose() {
SampleConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new SampleConnectorClientConfigOverridePolicy();
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
herder.stopServices();
assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
}
@Test
public void testConnectorStatus() {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(plugins.newConnector(anyString())).thenReturn(new SampleSourceConnector());
when(herder.plugins()).thenReturn(plugins);
when(herder.rawConfig(connectorName)).thenReturn(Collections.singletonMap(
ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName()
));
when(statusStore.get(connectorName))
.thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
when(statusStore.getAll(connectorName))
.thenReturn(Collections.singletonList(
new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
ConnectorStateInfo state = herder.connectorStatus(connectorName);
assertEquals(connectorName, state.name());
assertEquals(ConnectorType.SOURCE, state.type());
assertEquals("RUNNING", state.connector().state());
assertEquals(1, state.tasks().size());
assertEquals(workerId, state.connector().workerId());
ConnectorStateInfo.TaskState taskState = state.tasks().get(0);
assertEquals(0, taskState.id());
assertEquals("UNASSIGNED", taskState.state());
assertEquals(workerId, taskState.workerId());
}
@Test
public void testConnectorStatusMissingPlugin() {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(plugins.newConnector(anyString())).thenThrow(new ConnectException("Unable to find class"));
when(herder.plugins()).thenReturn(plugins);
when(herder.rawConfig(connectorName))
.thenReturn(Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "missing"));
when(statusStore.get(connectorName))
.thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
when(statusStore.getAll(connectorName))
.thenReturn(Collections.singletonList(
new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
ConnectorStateInfo state = herder.connectorStatus(connectorName);
assertEquals(connectorName, state.name());
assertEquals(ConnectorType.UNKNOWN, state.type());
assertEquals("RUNNING", state.connector().state());
assertEquals(1, state.tasks().size());
assertEquals(workerId, state.connector().workerId());
ConnectorStateInfo.TaskState taskState = state.tasks().get(0);
assertEquals(0, taskState.id());
assertEquals("UNASSIGNED", taskState.state());
assertEquals(workerId, taskState.workerId());
}
@Test
public void testConnectorInfo() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(plugins.newConnector(anyString())).thenReturn(new SampleSourceConnector());
when(herder.plugins()).thenReturn(plugins);
when(configStore.snapshot()).thenReturn(SNAPSHOT);
ConnectorInfo info = herder.connectorInfo(CONN1);
assertEquals(CONN1, info.name());
assertEquals(CONN1_CONFIG, info.config());
assertEquals(Arrays.asList(TASK0, TASK1, TASK2), info.tasks());
assertEquals(ConnectorType.SOURCE, info.type());
}
@Test
public void testPauseConnector() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(configStore.contains(CONN1)).thenReturn(true);
herder.pauseConnector(CONN1);
verify(configStore).putTargetState(CONN1, TargetState.PAUSED);
}
@Test
public void testResumeConnector() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(configStore.contains(CONN1)).thenReturn(true);
herder.resumeConnector(CONN1);
verify(configStore).putTargetState(CONN1, TargetState.STARTED);
}
@Test
public void testConnectorInfoMissingPlugin() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(plugins.newConnector(anyString())).thenThrow(new ConnectException("No class found"));
when(herder.plugins()).thenReturn(plugins);
when(configStore.snapshot()).thenReturn(SNAPSHOT);
ConnectorInfo info = herder.connectorInfo(CONN1);
assertEquals(CONN1, info.name());
assertEquals(CONN1_CONFIG, info.config());
assertEquals(Arrays.asList(TASK0, TASK1, TASK2), info.tasks());
assertEquals(ConnectorType.UNKNOWN, info.type());
}
@Test
public void testTaskStatus() {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
String workerId = "workerId";
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
final ArgumentCaptor<TaskStatus> taskStatusArgumentCaptor = ArgumentCaptor.forClass(TaskStatus.class);
doNothing().when(statusStore).putSafe(taskStatusArgumentCaptor.capture());
when(statusStore.get(taskId)).thenAnswer(invocation -> taskStatusArgumentCaptor.getValue());
herder.onFailure(taskId, new RuntimeException());
ConnectorStateInfo.TaskState taskState = herder.taskStatus(taskId);
assertEquals(workerId, taskState.workerId());
assertEquals("FAILED", taskState.state());
assertEquals(0, taskState.id());
assertNotNull(taskState.trace());
}
@Test
public void testBuildRestartPlanForUnknownConnector() {
String connectorName = "UnknownConnector";
RestartRequest restartRequest = new RestartRequest(connectorName, false, true);
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(statusStore.get(connectorName)).thenReturn(null);
Optional<RestartPlan> mayBeRestartPlan = herder.buildRestartPlan(restartRequest);
assertFalse(mayBeRestartPlan.isPresent());
}
@Test
public void testConfigValidationNullConfig() {
AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
config.put("name", "somename");
config.put("required", "value");
config.put("testKey", null);
final ConfigInfos configInfos = herder.validateConnectorConfig(config, false);
assertEquals(1, configInfos.errorCount());
assertErrorForKey(configInfos, "testKey");
verifyValidationIsolation();
}
@Test
public void testConfigValidationMultipleNullConfig() {
AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
config.put("name", "somename");
config.put("required", "value");
config.put("testKey", null);
config.put("secondTestKey", null);
final ConfigInfos configInfos = herder.validateConnectorConfig(config, false);
assertEquals(2, configInfos.errorCount());
assertErrorForKey(configInfos, "testKey");
assertErrorForKey(configInfos, "secondTestKey");
verifyValidationIsolation();
}
@Test
public void testBuildRestartPlanForConnectorAndTasks() {
RestartRequest restartRequest = new RestartRequest(connectorName, false, true);
ConnectorTaskId taskId1 = new ConnectorTaskId(connectorName, 1);
ConnectorTaskId taskId2 = new ConnectorTaskId(connectorName, 2);
List<TaskStatus> taskStatuses = new ArrayList<>();
taskStatuses.add(new TaskStatus(taskId1, AbstractStatus.State.RUNNING, workerId, generation));
taskStatuses.add(new TaskStatus(taskId2, AbstractStatus.State.FAILED, workerId, generation));
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(herder.rawConfig(connectorName)).thenReturn(null);
when(statusStore.get(connectorName))
.thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
when(statusStore.getAll(connectorName)).thenReturn(taskStatuses);
Optional<RestartPlan> mayBeRestartPlan = herder.buildRestartPlan(restartRequest);
assertTrue(mayBeRestartPlan.isPresent());
RestartPlan restartPlan = mayBeRestartPlan.get();
assertTrue(restartPlan.shouldRestartConnector());
assertTrue(restartPlan.shouldRestartTasks());
assertEquals(2, restartPlan.taskIdsToRestart().size());
assertTrue(restartPlan.taskIdsToRestart().contains(taskId1));
assertTrue(restartPlan.taskIdsToRestart().contains(taskId2));
}
@Test
public void testBuildRestartPlanForNoRestart() {
RestartRequest restartRequest = new RestartRequest(connectorName, true, false);
ConnectorTaskId taskId1 = new ConnectorTaskId(connectorName, 1);
ConnectorTaskId taskId2 = new ConnectorTaskId(connectorName, 2);
List<TaskStatus> taskStatuses = new ArrayList<>();
taskStatuses.add(new TaskStatus(taskId1, AbstractStatus.State.RUNNING, workerId, generation));
taskStatuses.add(new TaskStatus(taskId2, AbstractStatus.State.FAILED, workerId, generation));
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(herder.rawConfig(connectorName)).thenReturn(null);
when(statusStore.get(connectorName))
.thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
when(statusStore.getAll(connectorName)).thenReturn(taskStatuses);
Optional<RestartPlan> mayBeRestartPlan = herder.buildRestartPlan(restartRequest);
assertTrue(mayBeRestartPlan.isPresent());
RestartPlan restartPlan = mayBeRestartPlan.get();
assertFalse(restartPlan.shouldRestartConnector());
assertFalse(restartPlan.shouldRestartTasks());
assertTrue(restartPlan.taskIdsToRestart().isEmpty());
}
@Test
public void testConfigValidationEmptyConfig() {
AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy, 0);
assertThrows(BadRequestException.class, () -> herder.validateConnectorConfig(Collections.emptyMap(), false));
verify(transformer).transform(Collections.emptyMap());
}
@Test
public void testConfigValidationMissingName() {
final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
ConfigInfos result = herder.validateConnectorConfig(config, false);
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
assertEquals(connectorClass.getName(), result.name());
assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP, SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
SourceConnectorConfig.OFFSETS_TOPIC_GROUP), result.groups());
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
// Base connector config has 14 fields, connector's configs add 7
assertEquals(21, infos.size());
// Missing name should generate an error
assertEquals(ConnectorConfig.NAME_CONFIG,
infos.get(ConnectorConfig.NAME_CONFIG).configValue().name());
assertEquals(1, infos.get(ConnectorConfig.NAME_CONFIG).configValue().errors().size());
// "required" config from connector should generate an error
assertEquals("required", infos.get("required").configValue().name());
assertEquals(1, infos.get("required").configValue().errors().size());
verifyValidationIsolation();
}
@Test
public void testConfigValidationInvalidTopics() {
final Class<? extends Connector> connectorClass = SampleSinkConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
verifyValidationIsolation();
}
@Test
public void testConfigValidationTopicsWithDlq() {
final Class<? extends Connector> connectorClass = SampleSinkConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1");
config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
verifyValidationIsolation();
}
@Test
public void testConfigValidationTopicsRegexWithDlq() {
final Class<? extends Connector> connectorClass = SampleSinkConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
verifyValidationIsolation();
}
@Test
public void testConfigValidationTransformsExtendResults() {
final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
// 2 transform aliases defined -> 2 plugin lookups
when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
// Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA,xformB");
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
config.put("required", "value"); // connector required config
ConfigInfos result = herder.validateConnectorConfig(config, false);
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
assertEquals(connectorClass.getName(), result.name());
// Each transform also gets its own group
List<String> expectedGroups = Arrays.asList(
ConnectorConfig.COMMON_GROUP,
ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP,
SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
SourceConnectorConfig.OFFSETS_TOPIC_GROUP,
"Transforms: xformA",
"Transforms: xformB"
);
assertEquals(expectedGroups, result.groups());
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(26, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
assertEquals("transforms.xformA.subconfig",
infos.get("transforms.xformA.subconfig").configValue().name());
assertEquals("transforms.xformB.type", infos.get("transforms.xformB.type").configValue().name());
assertFalse(infos.get("transforms.xformB.type").configValue().errors().isEmpty());
verify(plugins, times(2)).transformations();
verifyValidationIsolation();
}
@Test
public void testConfigValidationPredicatesExtendResults() {
final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
// Define 2 predicates. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA");
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.predicate", "predX");
config.put(ConnectorConfig.PREDICATES_CONFIG, "predX,predY");
config.put(ConnectorConfig.PREDICATES_CONFIG + ".predX.type", SamplePredicate.class.getName());
config.put("required", "value"); // connector required config
ConfigInfos result = herder.validateConnectorConfig(config, false);
assertEquals(ConnectorType.SOURCE, herder.connectorType(config));
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
assertEquals(connectorClass.getName(), result.name());
// Each transform also gets its own group
List<String> expectedGroups = Arrays.asList(
ConnectorConfig.COMMON_GROUP,
ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP,
SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
SourceConnectorConfig.OFFSETS_TOPIC_GROUP,
"Transforms: xformA",
"Predicates: predX",
"Predicates: predY"
);
assertEquals(expectedGroups, result.groups());
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(28, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name());
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
assertEquals("transforms.xformA.subconfig", infos.get("transforms.xformA.subconfig").configValue().name());
assertEquals("transforms.xformA.predicate", infos.get("transforms.xformA.predicate").configValue().name());
assertTrue(infos.get("transforms.xformA.predicate").configValue().errors().isEmpty());
assertEquals("transforms.xformA.negate", infos.get("transforms.xformA.negate").configValue().name());
assertTrue(infos.get("transforms.xformA.negate").configValue().errors().isEmpty());
assertEquals("predicates.predX.type", infos.get("predicates.predX.type").configValue().name());
assertEquals("predicates.predX.predconfig", infos.get("predicates.predX.predconfig").configValue().name());
assertEquals("predicates.predY.type", infos.get("predicates.predY.type").configValue().name());
assertFalse(infos.get("predicates.predY.type").configValue().errors().isEmpty());
verify(plugins).transformations();
verify(plugins, times(2)).predicates();
verifyValidationIsolation();
}
@SuppressWarnings({"rawtypes", "unchecked"})
private PluginDesc<Predicate<?>> predicatePluginDesc() {
return new PluginDesc(SamplePredicate.class, "1.0", classLoader);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private PluginDesc<Transformation<?>> transformationPluginDesc() {
return new PluginDesc(SampleTransformation.class, "1.0", classLoader);
}
@Test
public void testConfigValidationPrincipalOnlyOverride() {
final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, new PrincipalConnectorClientConfigOverridePolicy());
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put("required", "value"); // connector required config
String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG);
String saslConfigKey = producerOverrideKey(SaslConfigs.SASL_JAAS_CONFIG);
config.put(ackConfigKey, "none");
config.put(saslConfigKey, "jaas_config");
ConfigInfos result = herder.validateConnectorConfig(config, false);
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
// We expect there to be errors due to now allowed override policy for ACKS.... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
assertEquals(SampleSourceConnector.class.getName(), result.name());
// Each transform also gets its own group
List<String> expectedGroups = Arrays.asList(
ConnectorConfig.COMMON_GROUP,
ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP,
SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
SourceConnectorConfig.OFFSETS_TOPIC_GROUP
);
assertEquals(expectedGroups, result.groups());
assertEquals(1, result.errorCount());
// Base connector config has 14 fields, connector's configs add 7, and 2 producer overrides
assertEquals(23, result.values().size());
assertTrue(result.values().stream().anyMatch(
configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty()));
assertTrue(result.values().stream().anyMatch(
configInfo -> saslConfigKey.equals(configInfo.configValue().name()) && configInfo.configValue().errors().isEmpty()));
verifyValidationIsolation();
}
@Test
public void testConfigValidationAllOverride() {
final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, new AllConnectorClientConfigOverridePolicy());
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put("required", "value"); // connector required config
// Try to test a variety of configuration types: string, int, long, boolean, list, class
String protocolConfigKey = producerOverrideKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
config.put(protocolConfigKey, "SASL_PLAINTEXT");
String maxRequestSizeConfigKey = producerOverrideKey(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
config.put(maxRequestSizeConfigKey, "420");
String maxBlockConfigKey = producerOverrideKey(ProducerConfig.MAX_BLOCK_MS_CONFIG);
config.put(maxBlockConfigKey, "28980");
String idempotenceConfigKey = producerOverrideKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
config.put(idempotenceConfigKey, "true");
String bootstrapServersConfigKey = producerOverrideKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
config.put(bootstrapServersConfigKey, "SASL_PLAINTEXT://localhost:12345,SASL_PLAINTEXT://localhost:23456");
String loginCallbackHandlerConfigKey = producerOverrideKey(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS);
config.put(loginCallbackHandlerConfigKey, OAuthBearerUnsecuredLoginCallbackHandler.class.getName());
final Set<String> overriddenClientConfigs = new HashSet<>();
overriddenClientConfigs.add(protocolConfigKey);
overriddenClientConfigs.add(maxRequestSizeConfigKey);
overriddenClientConfigs.add(maxBlockConfigKey);
overriddenClientConfigs.add(idempotenceConfigKey);
overriddenClientConfigs.add(bootstrapServersConfigKey);
overriddenClientConfigs.add(loginCallbackHandlerConfigKey);
ConfigInfos result = herder.validateConnectorConfig(config, false);
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
Map<String, String> validatedOverriddenClientConfigs = new HashMap<>();
for (ConfigInfo configInfo : result.values()) {
String configName = configInfo.configKey().name();
if (overriddenClientConfigs.contains(configName)) {
validatedOverriddenClientConfigs.put(configName, configInfo.configValue().value());
}
}
Map<String, String> rawOverriddenClientConfigs = config.entrySet().stream()
.filter(e -> overriddenClientConfigs.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
assertEquals(rawOverriddenClientConfigs, validatedOverriddenClientConfigs);
verifyValidationIsolation();
}
@Test
public void testReverseTransformConfigs() {
// Construct a task config with constant values for TEST_KEY and TEST_KEY2
Map<String, String> newTaskConfig = new HashMap<>();
newTaskConfig.put(TaskConfig.TASK_CLASS_CONFIG, SampleSourceConnector.SampleSourceTask.class.getName());
newTaskConfig.put(TEST_KEY, TEST_VAL);
newTaskConfig.put(TEST_KEY2, TEST_VAL2);
List<Map<String, String>> newTaskConfigs = new ArrayList<>();
newTaskConfigs.add(newTaskConfig);
// The SNAPSHOT has a task config with TEST_KEY and TEST_REF
List<Map<String, String>> reverseTransformed = AbstractHerder.reverseTransform(CONN1, SNAPSHOT, newTaskConfigs);
assertEquals(TEST_REF, reverseTransformed.get(0).get(TEST_KEY));
// The SNAPSHOT has no task configs but does have a connector config with TEST_KEY2 and TEST_REF2
reverseTransformed = AbstractHerder.reverseTransform(CONN1, SNAPSHOT_NO_TASKS, newTaskConfigs);
assertEquals(TEST_REF2, reverseTransformed.get(0).get(TEST_KEY2));
// The reverseTransformed result should not have TEST_KEY3 since newTaskConfigs does not have TEST_KEY3
reverseTransformed = AbstractHerder.reverseTransform(CONN1, SNAPSHOT_NO_TASKS, newTaskConfigs);
assertFalse(reverseTransformed.get(0).containsKey(TEST_KEY3));
}
private void assertErrorForKey(ConfigInfos configInfos, String testKey) {
final List<String> errorsForKey = configInfos.values().stream()
.map(ConfigInfo::configValue)
.filter(configValue -> configValue.name().equals(testKey))
.map(ConfigValueInfo::errors)
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertEquals(1, errorsForKey.size());
}
@Test
public void testConfigProviderRegex() {
testConfigProviderRegex("\"${::}\"");
testConfigProviderRegex("${::}");
testConfigProviderRegex("\"${:/a:somevar}\"");
testConfigProviderRegex("\"${file::somevar}\"");
testConfigProviderRegex("${file:/a/b/c:}");
testConfigProviderRegex("${file:/tmp/somefile.txt:somevar}");
testConfigProviderRegex("\"${file:/tmp/somefile.txt:somevar}\"");
testConfigProviderRegex("plain.PlainLoginModule required username=\"${file:/tmp/somefile.txt:somevar}\"");
testConfigProviderRegex("plain.PlainLoginModule required username=${file:/tmp/somefile.txt:somevar}");
testConfigProviderRegex("plain.PlainLoginModule required username=${file:/tmp/somefile.txt:somevar} not null");
testConfigProviderRegex("plain.PlainLoginModule required username=${file:/tmp/somefile.txt:somevar} password=${file:/tmp/somefile.txt:othervar}");
testConfigProviderRegex("plain.PlainLoginModule required username", false);
}
@Test
public void testGenerateResultWithConfigValuesAllUsingConfigKeysAndWithNoErrors() {
String name = "com.acme.connector.MyConnector";
Map<String, ConfigDef.ConfigKey> keys = new HashMap<>();
addConfigKey(keys, "config.a1", null);
addConfigKey(keys, "config.b1", "group B");
addConfigKey(keys, "config.b2", "group B");
addConfigKey(keys, "config.c1", "group C");
List<String> groups = Arrays.asList("groupB", "group C");
List<ConfigValue> values = new ArrayList<>();
addValue(values, "config.a1", "value.a1");
addValue(values, "config.b1", "value.b1");
addValue(values, "config.b2", "value.b2");
addValue(values, "config.c1", "value.c1");
ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
assertEquals(name, infos.name());
assertEquals(groups, infos.groups());
assertEquals(values.size(), infos.values().size());
assertEquals(0, infos.errorCount());
assertInfoKey(infos, "config.a1", null);
assertInfoKey(infos, "config.b1", "group B");
assertInfoKey(infos, "config.b2", "group B");
assertInfoKey(infos, "config.c1", "group C");
assertInfoValue(infos, "config.a1", "value.a1");
assertInfoValue(infos, "config.b1", "value.b1");
assertInfoValue(infos, "config.b2", "value.b2");
assertInfoValue(infos, "config.c1", "value.c1");
}
@Test
public void testGenerateResultWithConfigValuesAllUsingConfigKeysAndWithSomeErrors() {
String name = "com.acme.connector.MyConnector";
Map<String, ConfigDef.ConfigKey> keys = new HashMap<>();
addConfigKey(keys, "config.a1", null);
addConfigKey(keys, "config.b1", "group B");
addConfigKey(keys, "config.b2", "group B");
addConfigKey(keys, "config.c1", "group C");
List<String> groups = Arrays.asList("groupB", "group C");
List<ConfigValue> values = new ArrayList<>();
addValue(values, "config.a1", "value.a1");
addValue(values, "config.b1", "value.b1");
addValue(values, "config.b2", "value.b2");
addValue(values, "config.c1", "value.c1", "error c1");
ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
assertEquals(name, infos.name());
assertEquals(groups, infos.groups());
assertEquals(values.size(), infos.values().size());
assertEquals(1, infos.errorCount());
assertInfoKey(infos, "config.a1", null);
assertInfoKey(infos, "config.b1", "group B");
assertInfoKey(infos, "config.b2", "group B");
assertInfoKey(infos, "config.c1", "group C");
assertInfoValue(infos, "config.a1", "value.a1");
assertInfoValue(infos, "config.b1", "value.b1");
assertInfoValue(infos, "config.b2", "value.b2");
assertInfoValue(infos, "config.c1", "value.c1", "error c1");
}
@Test
public void testGenerateResultWithConfigValuesMoreThanConfigKeysAndWithSomeErrors() {
String name = "com.acme.connector.MyConnector";
Map<String, ConfigDef.ConfigKey> keys = new HashMap<>();
addConfigKey(keys, "config.a1", null);
addConfigKey(keys, "config.b1", "group B");
addConfigKey(keys, "config.b2", "group B");
addConfigKey(keys, "config.c1", "group C");
List<String> groups = Arrays.asList("groupB", "group C");
List<ConfigValue> values = new ArrayList<>();
addValue(values, "config.a1", "value.a1");
addValue(values, "config.b1", "value.b1");
addValue(values, "config.b2", "value.b2");
addValue(values, "config.c1", "value.c1", "error c1");
addValue(values, "config.extra1", "value.extra1");
addValue(values, "config.extra2", "value.extra2", "error extra2");
ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
assertEquals(name, infos.name());
assertEquals(groups, infos.groups());
assertEquals(values.size(), infos.values().size());
assertEquals(2, infos.errorCount());
assertInfoKey(infos, "config.a1", null);
assertInfoKey(infos, "config.b1", "group B");
assertInfoKey(infos, "config.b2", "group B");
assertInfoKey(infos, "config.c1", "group C");
assertNoInfoKey(infos, "config.extra1");
assertNoInfoKey(infos, "config.extra2");
assertInfoValue(infos, "config.a1", "value.a1");
assertInfoValue(infos, "config.b1", "value.b1");
assertInfoValue(infos, "config.b2", "value.b2");
assertInfoValue(infos, "config.c1", "value.c1", "error c1");
assertInfoValue(infos, "config.extra1", "value.extra1");
assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2");
}
@Test
public void testGenerateResultWithConfigValuesWithNoConfigKeysAndWithSomeErrors() {
String name = "com.acme.connector.MyConnector";
Map<String, ConfigDef.ConfigKey> keys = new HashMap<>();
List<String> groups = new ArrayList<>();
List<ConfigValue> values = new ArrayList<>();
addValue(values, "config.a1", "value.a1");
addValue(values, "config.b1", "value.b1");
addValue(values, "config.b2", "value.b2");
addValue(values, "config.c1", "value.c1", "error c1");
addValue(values, "config.extra1", "value.extra1");
addValue(values, "config.extra2", "value.extra2", "error extra2");
ConfigInfos infos = AbstractHerder.generateResult(name, keys, values, groups);
assertEquals(name, infos.name());
assertEquals(groups, infos.groups());
assertEquals(values.size(), infos.values().size());
assertEquals(2, infos.errorCount());
assertNoInfoKey(infos, "config.a1");
assertNoInfoKey(infos, "config.b1");
assertNoInfoKey(infos, "config.b2");
assertNoInfoKey(infos, "config.c1");
assertNoInfoKey(infos, "config.extra1");
assertNoInfoKey(infos, "config.extra2");
assertInfoValue(infos, "config.a1", "value.a1");
assertInfoValue(infos, "config.b1", "value.b1");
assertInfoValue(infos, "config.b2", "value.b2");
assertInfoValue(infos, "config.c1", "value.c1", "error c1");
assertInfoValue(infos, "config.extra1", "value.extra1");
assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2");
}
@Test
public void testSinkConnectorPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig(
"sink",
SampleSinkConnector::new,
SampleSinkConnector::config,
Optional.of(SinkConnectorConfig.configDef())
);
}
@Test
public void testSinkConnectorPluginConfigIncludingCommon() throws ClassNotFoundException {
testConnectorPluginConfig(
"sink",
SampleSinkConnector::new,
SampleSinkConnector::configWithCommon,
Optional.empty()
);
}
@Test
public void testSourceConnectorPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig(
"source",
SampleSourceConnector::new,
SampleSourceConnector::config,
Optional.of(SourceConnectorConfig.configDef())
);
}
@Test
public void testSourceConnectorPluginConfigIncludingCommon() throws ClassNotFoundException {
testConnectorPluginConfig(
"source",
SampleSourceConnector::new,
SampleSourceConnector::configWithCommon,
Optional.empty()
);
}
@Test
public void testConverterPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig(
"converter",
SampleConverterWithHeaders::new,
SampleConverterWithHeaders::config,
Optional.empty()
);
}
@Test
public void testHeaderConverterPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig(
"header-converter",
SampleHeaderConverter::new,
SampleHeaderConverter::config,
Optional.empty()
);
}
@Test
public void testPredicatePluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig(
"predicate",
SamplePredicate::new,
SamplePredicate::config,
Optional.empty()
);
}
@Test
public void testTransformationPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig(
"transformation",
SampleTransformation::new,
SampleTransformation::config,
Optional.empty()
);
}
private <T> void testConnectorPluginConfig(
String pluginName,
Supplier<T> newPluginInstance,
Function<T, ConfigDef> pluginConfig,
Optional<ConfigDef> baseConfig
) throws ClassNotFoundException {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(plugins.pluginClass(pluginName)).then(invocation -> newPluginInstance.get().getClass());
when(plugins.newPlugin(anyString())).then(invocation -> newPluginInstance.get());
when(herder.plugins()).thenReturn(plugins);
List<ConfigKeyInfo> configs = herder.connectorPluginConfig(pluginName);
assertNotNull(configs);
ConfigDef expectedConfig = pluginConfig.apply(newPluginInstance.get());
int expectedConfigSize = baseConfig.map(config -> config.names().size()).orElse(0)
+ expectedConfig.names().size();
assertEquals(expectedConfigSize, configs.size());
// Make sure that we used the correct class loader when interacting with the plugin
verify(plugins).withClassLoader(newPluginInstance.get().getClass().getClassLoader());
}
@Test(expected = NotFoundException.class)
public void testGetConnectorConfigDefWithBadName() throws Exception {
String connName = "AnotherPlugin";
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(worker.getPlugins()).thenReturn(plugins);
when(plugins.pluginClass(anyString())).thenThrow(new ClassNotFoundException());
herder.connectorPluginConfig(connName);
}
@Test(expected = BadRequestException.class)
@SuppressWarnings({"rawtypes", "unchecked"})
public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception {
String connName = "AnotherPlugin";
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(worker.getPlugins()).thenReturn(plugins);
when(plugins.pluginClass(anyString())).thenReturn((Class) Object.class);
when(plugins.newPlugin(anyString())).thenReturn(new DirectoryConfigProvider());
herder.connectorPluginConfig(connName);
}
@Test
public void testGetConnectorTypeWithMissingPlugin() {
String connName = "AnotherPlugin";
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(worker.getPlugins()).thenReturn(plugins);
when(plugins.newConnector(anyString())).thenThrow(new ConnectException("No class found"));
assertEquals(ConnectorType.UNKNOWN, herder.connectorType(Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connName)));
}
@Test
public void testGetConnectorTypeWithNullConfig() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
assertEquals(ConnectorType.UNKNOWN, herder.connectorType(null));
}
@Test
public void testGetConnectorTypeWithEmptyConfig() {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
assertEquals(ConnectorType.UNKNOWN, herder.connectorType(Collections.emptyMap()));
}
@Test
public void testConnectorOffsetsConnectorNotFound() {
when(configStore.snapshot()).thenReturn(SNAPSHOT);
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
FutureCallback<ConnectorOffsets> cb = new FutureCallback<>();
herder.connectorOffsets("unknown-connector", cb);
ExecutionException e = assertThrows(ExecutionException.class, () -> cb.get(1000, TimeUnit.MILLISECONDS));
assertEquals(NotFoundException.class, e.getCause().getClass());
}
@Test
public void testConnectorOffsets() throws Exception {
ConnectorOffsets offsets = new ConnectorOffsets(Arrays.asList(
new ConnectorOffset(Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
new ConnectorOffset(Collections.singletonMap("partitionKey", "partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue"))
));
@SuppressWarnings("unchecked")
ArgumentCaptor<Callback<ConnectorOffsets>> workerCallback = ArgumentCaptor.forClass(Callback.class);
doAnswer(invocation -> {
workerCallback.getValue().onCompletion(null, offsets);
return null;
}).when(worker).connectorOffsets(eq(CONN1), eq(CONN1_CONFIG), workerCallback.capture());
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
when(configStore.snapshot()).thenReturn(SNAPSHOT);
FutureCallback<ConnectorOffsets> cb = new FutureCallback<>();
herder.connectorOffsets(CONN1, cb);
assertEquals(offsets, cb.get(1000, TimeUnit.MILLISECONDS));
}
protected void addConfigKey(Map<String, ConfigDef.ConfigKey> keys, String name, String group) {
keys.put(name, new ConfigDef.ConfigKey(name, ConfigDef.Type.STRING, null, null,
ConfigDef.Importance.HIGH, "doc", group, 10,
ConfigDef.Width.MEDIUM, "display name", Collections.emptyList(), null, false));
}
protected void addValue(List<ConfigValue> values, String name, String value, String...errors) {
values.add(new ConfigValue(name, value, new ArrayList<>(), Arrays.asList(errors)));
}
protected void assertInfoKey(ConfigInfos infos, String name, String group) {
ConfigInfo info = findInfo(infos, name);
assertEquals(name, info.configKey().name());
assertEquals(group, info.configKey().group());
}
protected void assertNoInfoKey(ConfigInfos infos, String name) {
ConfigInfo info = findInfo(infos, name);
assertNull(info.configKey());
}
protected void assertInfoValue(ConfigInfos infos, String name, String value, String...errors) {
ConfigValueInfo info = findInfo(infos, name).configValue();
assertEquals(name, info.name());
assertEquals(value, info.value());
assertEquals(Arrays.asList(errors), info.errors());
}
protected ConfigInfo findInfo(ConfigInfos infos, String name) {
return infos.values()
.stream()
.filter(i -> i.configValue().name().equals(name))
.findFirst()
.orElse(null);
}
private void testConfigProviderRegex(String rawConnConfig) {
testConfigProviderRegex(rawConnConfig, true);
}
private void testConfigProviderRegex(String rawConnConfig, boolean expected) {
Set<String> keys = keysWithVariableValues(Collections.singletonMap("key", rawConnConfig), ConfigTransformer.DEFAULT_PATTERN);
boolean actual = keys != null && !keys.isEmpty() && keys.contains("key");
assertEquals(String.format("%s should have matched regex", rawConnConfig), expected, actual);
}
private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
return createConfigValidationHerder(connectorClass, connectorClientConfigOverridePolicy, 1);
}
private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
int countOfCallingNewConnector) {
AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, connectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS));
// Call to validateConnectorConfig
when(worker.configTransformer()).thenReturn(transformer);
@SuppressWarnings("unchecked")
final ArgumentCaptor<Map<String, String>> mapArgumentCaptor = ArgumentCaptor.forClass(Map.class);
when(transformer.transform(mapArgumentCaptor.capture())).thenAnswer(invocation -> mapArgumentCaptor.getValue());
when(worker.getPlugins()).thenReturn(plugins);
final Connector connector;
try {
connector = connectorClass.getConstructor().newInstance();
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Couldn't create connector", e);
}
if (countOfCallingNewConnector > 0) {
mockValidationIsolation(connectorClass.getName(), connector);
}
return herder;
}
private void mockValidationIsolation(String connectorClass, Connector connector) {
when(plugins.newConnector(connectorClass)).thenReturn(connector);
when(plugins.connectorLoader(connectorClass)).thenReturn(classLoader);
when(plugins.withClassLoader(classLoader)).thenReturn(loaderSwap);
}
private void verifyValidationIsolation() {
verify(plugins).newConnector(anyString());
verify(plugins).withClassLoader(classLoader);
verify(loaderSwap).close();
}
private static String producerOverrideKey(String config) {
return ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + config;
}
}