blob: ae5f82cd3eeb21c0c46d65661d1a5fc8e0d7fbb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TestFuture;
import org.apache.kafka.connect.util.TopicAdmin;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PrepareForTest({KafkaConfigBackingStore.class, WorkerConfig.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
public class KafkaConfigBackingStoreTest {
private static final String CLIENT_ID_BASE = "test-client-id-";
private static final String TOPIC = "connect-configs";
private static final short TOPIC_REPLICATION_FACTOR = 5;
private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
static {
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, TOPIC);
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, Short.toString(TOPIC_REPLICATION_FACTOR));
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect");
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
}
private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
private static final List<String> CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
private static final List<String> COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
private static final List<String> TARGET_STATE_KEYS = Arrays.asList("target-state-connector1", "target-state-connector2");
private static final List<String> CONNECTOR_TASK_COUNT_RECORD_KEYS = Arrays.asList("tasks-fencing-connector1", "tasks-fencing-connector2");
private static final String CONNECTOR_1_NAME = "connector1";
private static final String CONNECTOR_2_NAME = "connector2";
private static final List<String> RESTART_CONNECTOR_KEYS = Arrays.asList(RESTART_KEY(CONNECTOR_1_NAME), RESTART_KEY(CONNECTOR_2_NAME));
// Need a) connector with multiple tasks and b) multiple connectors
private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList(
new ConnectorTaskId("connector1", 0),
new ConnectorTaskId("connector1", 1),
new ConnectorTaskId("connector2", 0)
);
private static final List<String> TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
// Need some placeholders -- the contents don't matter here, just that they are restored properly
private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
Collections.singletonMap("config-key-one", "config-value-one"),
Collections.singletonMap("config-key-two", "config-value-two"),
Collections.singletonMap("config-key-three", "config-value-three")
);
private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
);
private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
);
private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
.put("state", "PAUSED")
.put("state.v2", "PAUSED");
private static final Struct TARGET_STATE_STOPPED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
.put("state", "PAUSED")
.put("state.v2", "STOPPED");
private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
= new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
private static final Struct ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(INCLUDE_TASKS_FIELD_NAME, false);
private static final Struct INCLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true);
private static final List<Struct> RESTART_REQUEST_STRUCTS = Arrays.asList(
new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true).put(INCLUDE_TASKS_FIELD_NAME, false),
ONLY_FAILED_MISSING_STRUCT,
INCLUDE_TASKS_MISSING_STRUCT);
// The exact format doesn't matter here since both conversions are mocked
private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
"config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(),
"config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(),
"config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
);
@Mock
private Converter converter;
@Mock
private ConfigBackingStore.UpdateListener configUpdateListener;
private final Map<String, String> props = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS);
private DistributedConfig config;
@Mock
KafkaBasedLog<String, byte[]> storeLog;
@Mock
Future<RecordMetadata> producerFuture;
private KafkaConfigBackingStore configStorage;
private final Capture<String> capturedTopic = EasyMock.newCapture();
private final Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
private final Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
private final Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
private final Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
private final Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
private final MockTime time = new MockTime();
private long logOffset = 0;
private void createStore() {
config = PowerMock.createPartialMock(
DistributedConfig.class,
new String[]{"kafkaClusterId"},
props);
EasyMock.expect(config.kafkaClusterId()).andReturn("test-cluster").anyTimes();
// The kafkaClusterId is used in the constructor for KafkaConfigBackingStore
// So temporarily enter replay mode in order to mock that call
EasyMock.replay(config);
Supplier<TopicAdmin> topicAdminSupplier = () -> null;
configStorage = PowerMock.createPartialMock(
KafkaConfigBackingStore.class,
new String[]{"createKafkaBasedLog", "createFencableProducer"},
converter, config, null, topicAdminSupplier, CLIENT_ID_BASE, time);
Whitebox.setInternalState(configStorage, "configLog", storeLog);
configStorage.setUpdateListener(configUpdateListener);
// The mock must be reset and re-mocked for the remainder of the test.
// TODO: Once this migrates to Mockito, just use a spy()
EasyMock.reset(config);
EasyMock.expect(config.kafkaClusterId()).andReturn("test-cluster").anyTimes();
}
@Before
public void setUp() {
createStore();
}
@Test
public void testTaskCountRecordsAndGenerations() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
// Task configs should read to end, write to the log, read to end, write root, then read to end again
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
"properties", SAMPLE_CONFIGS.get(0));
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
"properties", SAMPLE_CONFIGS.get(1));
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
"tasks", 2); // Starts with 0 tasks, after update has 2
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
EasyMock.expectLastCall();
// Records to be read by consumer as it reads to the end of the log
LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
expectReadToEnd(serializedConfigs);
// Task count records are read back after writing as well
expectConvertWriteRead(
CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONFIGS_SERIALIZED.get(3),
"task-count", 4);
serializedConfigs = new LinkedHashMap<>();
serializedConfigs.put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(3));
expectReadToEnd(serializedConfigs);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Bootstrap as if we had already added the connector, but no tasks had been added yet
whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
// Before anything is written
String connectorName = CONNECTOR_IDS.get(0);
ClusterConfigState configState = configStorage.snapshot();
assertFalse(configState.pendingFencing(connectorName));
assertNull(configState.taskCountRecord(connectorName));
assertNull(configState.taskConfigGeneration(connectorName));
// Writing task configs should block until all the writes have been performed and the root record update
// has completed
List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
configStorage.putTaskConfigs("connector1", taskConfigs);
configState = configStorage.snapshot();
assertEquals(3, configState.offset());
assertTrue(configState.pendingFencing(connectorName));
assertNull(configState.taskCountRecord(connectorName));
assertEquals(0, (long) configState.taskConfigGeneration(connectorName));
configStorage.putTaskCountRecord(connectorName, 4);
configState = configStorage.snapshot();
assertEquals(4, configState.offset());
assertFalse(configState.pendingFencing(connectorName));
assertEquals(4, (long) configState.taskCountRecord(connectorName));
assertEquals(0, (long) configState.taskConfigGeneration(connectorName));
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testPutTaskConfigs() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
// Task configs should read to end, write to the log, read to end, write root, then read to end again
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
"properties", SAMPLE_CONFIGS.get(0));
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
"properties", SAMPLE_CONFIGS.get(1));
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
"tasks", 2); // Starts with 0 tasks, after update has 2
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
EasyMock.expectLastCall();
// Records to be read by consumer as it reads to the end of the log
LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
expectReadToEnd(serializedConfigs);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Bootstrap as if we had already added the connector, but no tasks had been added yet
whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
// Null before writing
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());
assertNull(configState.taskConfig(TASK_IDS.get(0)));
assertNull(configState.taskConfig(TASK_IDS.get(1)));
// Writing task configs should block until all the writes have been performed and the root record update
// has completed
List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
configStorage.putTaskConfigs("connector1", taskConfigs);
// Validate root config by listing all connectors and tasks
configState = configStorage.snapshot();
assertEquals(3, configState.offset());
String connectorName = CONNECTOR_IDS.get(0);
assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors()));
assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testPutTaskConfigsStartsOnlyReconfiguredTasks() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
// Task configs should read to end, write to the log, read to end, write root, then read to end again
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
"properties", SAMPLE_CONFIGS.get(0));
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
"properties", SAMPLE_CONFIGS.get(1));
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
"tasks", 2); // Starts with 0 tasks, after update has 2
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
EasyMock.expectLastCall();
// Records to be read by consumer as it reads to the end of the log
LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
expectReadToEnd(serializedConfigs);
// Task configs should read to end, write to the log, read to end, write root, then read to end again
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
TASK_CONFIG_KEYS.get(2), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(3),
"properties", SAMPLE_CONFIGS.get(2));
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
COMMIT_TASKS_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(4),
"tasks", 1); // Starts with 2 tasks, after update has 3
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
configUpdateListener.onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(2)));
EasyMock.expectLastCall();
// Records to be read by consumer as it reads to the end of the log
serializedConfigs = new LinkedHashMap<>();
serializedConfigs.put(TASK_CONFIG_KEYS.get(2), CONFIGS_SERIALIZED.get(3));
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4));
expectReadToEnd(serializedConfigs);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Bootstrap as if we had already added the connector, but no tasks had been added yet
whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
whiteboxAddConnector(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1), Collections.emptyList());
// Null before writing
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());
assertNull(configState.taskConfig(TASK_IDS.get(0)));
assertNull(configState.taskConfig(TASK_IDS.get(1)));
// Writing task configs should block until all the writes have been performed and the root record update
// has completed
List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
configStorage.putTaskConfigs("connector1", taskConfigs);
taskConfigs = Collections.singletonList(SAMPLE_CONFIGS.get(2));
configStorage.putTaskConfigs("connector2", taskConfigs);
// Validate root config by listing all connectors and tasks
configState = configStorage.snapshot();
assertEquals(5, configState.offset());
String connectorName1 = CONNECTOR_IDS.get(0);
String connectorName2 = CONNECTOR_IDS.get(1);
assertEquals(Arrays.asList(connectorName1, connectorName2), new ArrayList<>(configState.connectors()));
assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName1));
assertEquals(Collections.singletonList(TASK_IDS.get(2)), configState.tasks(connectorName2));
assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
assertEquals(SAMPLE_CONFIGS.get(2), configState.taskConfig(TASK_IDS.get(2)));
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testPutTaskConfigsZeroTasks() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
// Task configs should read to end, write to the log, read to end, write root.
expectReadToEnd(new LinkedHashMap<>());
expectConvertWriteRead(
COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
"tasks", 0); // We have 0 tasks
// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
configUpdateListener.onTaskConfigUpdate(Collections.emptyList());
EasyMock.expectLastCall();
// Records to be read by consumer as it reads to the end of the log
LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
expectReadToEnd(serializedConfigs);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Bootstrap as if we had already added the connector, but no tasks had been added yet
whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
// Null before writing
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());
// Writing task configs should block until all the writes have been performed and the root record update
// has completed
List<Map<String, String>> taskConfigs = Collections.emptyList();
configStorage.putTaskConfigs("connector1", taskConfigs);
// Validate root config by listing all connectors and tasks
configState = configStorage.snapshot();
assertEquals(1, configState.offset());
String connectorName = CONNECTOR_IDS.get(0);
assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors()));
assertEquals(Collections.emptyList(), configState.tasks(connectorName));
assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testBackgroundUpdateTargetState() throws Exception {
// verify that we handle target state changes correctly when they come up through the log
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserializedOnStartup = new LinkedHashMap<>();
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
expectStart(existingRecords, deserializedOnStartup);
LinkedHashMap<String, byte[]> serializedAfterStartup = new LinkedHashMap<>();
serializedAfterStartup.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
serializedAfterStartup.put(TARGET_STATE_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
Map<String, Struct> deserializedAfterStartup = new HashMap<>();
deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0), TARGET_STATE_PAUSED);
deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1), TARGET_STATE_STOPPED);
expectRead(serializedAfterStartup, deserializedAfterStartup);
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
EasyMock.expectLastCall();
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should see a single connector with initial state started
ClusterConfigState configState = configStorage.snapshot();
assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configStorage.connectorTargetStates.keySet());
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
// Should see two connectors now, one paused and one stopped
configStorage.refresh(0, TimeUnit.SECONDS);
configState = configStorage.snapshot();
assertEquals(new HashSet<>(CONNECTOR_IDS), configStorage.connectorTargetStates.keySet());
assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(1)));
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testSameTargetState() throws Exception {
// verify that we handle target state changes correctly when they come up through the log
expectConfigure();
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
expectStart(existingRecords, deserialized);
// on resume update listener shouldn't be called
configUpdateListener.onConnectorTargetStateChange(EasyMock.anyString());
EasyMock.expectLastCall().andStubThrow(new AssertionError("unexpected call to onConnectorTargetStateChange"));
expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Should see a single connector with initial state paused
ClusterConfigState configState = configStorage.snapshot();
assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
configStorage.refresh(0, TimeUnit.SECONDS);
configStorage.stop();
PowerMock.verifyAll();
}
private void expectConfigure() throws Exception {
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback),
EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier),
EasyMock.anyObject(WorkerConfig.class), EasyMock.anyObject(Time.class))
.andReturn(storeLog);
}
private void expectPartitionCount(int partitionCount) {
EasyMock.expect(storeLog.partitionCount())
.andReturn(partitionCount);
}
// If non-empty, deserializations should be a LinkedHashMap
private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
final Map<byte[], Struct> deserializations) {
storeLog.start();
PowerMock.expectLastCall().andAnswer(() -> {
for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
capturedConsumedCallback.getValue().onCompletion(null, rec);
return null;
});
for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) {
// Note null schema because default settings for internal serialization are schema-less
EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey())))
.andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue())));
}
}
private void expectStop() {
storeLog.stop();
PowerMock.expectLastCall();
}
private void expectRead(LinkedHashMap<String, byte[]> serializedValues,
Map<String, Struct> deserializedValues) {
expectReadToEnd(serializedValues);
for (Map.Entry<String, Struct> deserializedValueEntry : deserializedValues.entrySet()) {
byte[] serializedValue = serializedValues.get(deserializedValueEntry.getKey());
EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serializedValue)))
.andReturn(new SchemaAndValue(null, structToMap(deserializedValueEntry.getValue())));
}
}
private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) {
LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
serializedData.put(key, serializedValue);
expectRead(serializedData, Collections.singletonMap(key, deserializedValue));
}
// Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
// from the log. Validate the data that is captured when the conversion is performed matches the specified data
// (by checking a single field's value)
private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
final String dataFieldName, final Object dataFieldValue) throws Exception {
final Capture<Struct> capturedRecord = EasyMock.newCapture();
if (serialized != null)
EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
.andReturn(serialized);
storeLog.sendWithReceipt(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
EasyMock.expectLastCall().andReturn(null);
EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
.andAnswer(() -> {
if (dataFieldName != null)
assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
// Note null schema because default settings for internal serialization are schema-less
return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue()));
});
}
// This map needs to maintain ordering
private void expectReadToEnd(final LinkedHashMap<String, byte[]> serializedConfigs) {
EasyMock.expect(storeLog.readToEnd())
.andAnswer(() -> {
TestFuture<Void> future = new TestFuture<>();
for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet()) {
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0, 0,
entry.getKey(), entry.getValue(), new RecordHeaders(), Optional.empty()));
}
future.resolveOnGet((Void) null);
return future;
});
}
// Manually insert a connector into config storage, updating the task configs, connector config, and root config
private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) {
Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs");
for (int i = 0; i < taskConfigs.size(); i++)
storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i));
Map<String, Map<String, String>> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs");
connectorConfigs.put(connectorName, connectorConfig);
Whitebox.<Map<String, Integer>>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size());
}
// Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted
private Map<String, Object> structToMap(Struct struct) {
if (struct == null)
return null;
HashMap<String, Object> result = new HashMap<>();
for (Field field : struct.schema().fields())
result.put(field.name(), struct.get(field));
return result;
}
}