blob: ee53a4942041c6821f11742a0fd585428bf7e597 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG;
import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.CONNECTOR_STATUS_PREFIX;
import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.TASK_STATUS_PREFIX;
import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.TOPIC_STATUS_PREFIX;
import static org.apache.kafka.connect.storage.KafkaStatusBackingStore.TOPIC_STATUS_SEPARATOR;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@SuppressWarnings("unchecked")
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class KafkaStatusBackingStoreFormatTest {
private static final String STATUS_TOPIC = "status-topic";
private static final String FOO_TOPIC = "foo-topic";
private static final String FOO_CONNECTOR = "foo-source";
private static final String BAR_TOPIC = "bar-topic";
private Time time;
private KafkaStatusBackingStore store;
private final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
@Before
public void setup() {
time = new MockTime();
JsonConverter converter = new JsonConverter();
converter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), false);
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog);
}
@Test
public void readInvalidStatus() {
String key = "status-unknown";
byte[] value = new byte[0];
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.connectors().isEmpty());
assertTrue(store.tasks.isEmpty());
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.connectors().isEmpty());
assertTrue(store.tasks.isEmpty());
assertTrue(store.topics.isEmpty());
key = CONNECTOR_STATUS_PREFIX;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.connectors().isEmpty());
store.read(statusRecord);
assertTrue(store.connectors().isEmpty());
key = TASK_STATUS_PREFIX;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.tasks.isEmpty());
store.read(statusRecord);
assertTrue(store.tasks.isEmpty());
key = TASK_STATUS_PREFIX + FOO_CONNECTOR + "-#";
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.tasks.isEmpty());
store.read(statusRecord);
assertTrue(store.tasks.isEmpty());
key = TOPIC_STATUS_PREFIX;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
key = TOPIC_STATUS_PREFIX + TOPIC_STATUS_SEPARATOR;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
key = TOPIC_STATUS_PREFIX + FOO_TOPIC + ":";
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
}
@Test
public void readInvalidStatusValue() {
String key = CONNECTOR_STATUS_PREFIX + FOO_CONNECTOR;
byte[] value = "invalid".getBytes();
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.connectors().isEmpty());
store.read(statusRecord);
assertTrue(store.connectors().isEmpty());
key = TASK_STATUS_PREFIX + FOO_CONNECTOR + "-0";
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.tasks.isEmpty());
store.read(statusRecord);
assertTrue(store.tasks.isEmpty());
key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
assertTrue(store.topics.isEmpty());
store.read(statusRecord);
assertTrue(store.topics.isEmpty());
}
@Test
public void readTopicStatus() {
TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), Time.SYSTEM.milliseconds());
String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
byte[] value = store.serializeTopicStatus(topicStatus);
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, value);
store.read(statusRecord);
assertTrue(store.topics.containsKey(FOO_CONNECTOR));
assertTrue(store.topics.get(FOO_CONNECTOR).containsKey(FOO_TOPIC));
assertEquals(topicStatus, store.topics.get(FOO_CONNECTOR).get(FOO_TOPIC));
}
@Test
public void deleteTopicStatus() {
TopicStatus topicStatus = new TopicStatus("foo", new ConnectorTaskId("bar", 0), Time.SYSTEM.milliseconds());
store.topics.computeIfAbsent("bar", k -> new ConcurrentHashMap<>()).put("foo", topicStatus);
assertTrue(store.topics.containsKey("bar"));
assertTrue(store.topics.get("bar").containsKey("foo"));
assertEquals(topicStatus, store.topics.get("bar").get("foo"));
// should return null
byte[] value = store.serializeTopicStatus(null);
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, "status-topic-foo:connector-bar", value);
store.read(statusRecord);
assertTrue(store.topics.containsKey("bar"));
assertFalse(store.topics.get("bar").containsKey("foo"));
assertEquals(Collections.emptyMap(), store.topics.get("bar"));
}
@Test
public void putTopicState() {
TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, null);
return null;
}).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), any(Callback.class));
store.put(topicStatus);
// check capture state
assertEquals(topicStatus, store.parseTopicStatus(valueCaptor.getValue()));
// state is not visible until read back from the log
assertNull(store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, valueCaptor.getValue());
store.read(statusRecord);
assertEquals(topicStatus, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
assertEquals(Collections.singleton(topicStatus), new HashSet<>(store.getAllTopics(FOO_CONNECTOR)));
}
@Test
public void putTopicStateRetriableFailure() {
TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, new TimeoutException());
return null;
}).doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, null);
return null;
}).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), any(Callback.class));
store.put(topicStatus);
verify(kafkaBasedLog, timeout(1000).times(2)).send(any(), any(), any());
// check capture state
assertEquals(topicStatus, store.parseTopicStatus(valueCaptor.getValue()));
// state is not visible until read back from the log
assertNull(store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
}
@Test
public void putTopicStateNonRetriableFailure() {
TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, new UnknownServerException());
return null;
}).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), any(Callback.class));
// the error is logged and ignored
store.put(topicStatus);
// check capture state
assertEquals(topicStatus, store.parseTopicStatus(valueCaptor.getValue()));
// state is not visible until read back from the log
assertNull(store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
}
@Test
public void putTopicStateShouldOverridePreviousState() {
TopicStatus firstTopicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR,
0), time.milliseconds());
time.sleep(1000);
TopicStatus secondTopicStatus = new TopicStatus(BAR_TOPIC, new ConnectorTaskId(FOO_CONNECTOR,
0), time.milliseconds());
String firstKey = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
String secondKey = TOPIC_STATUS_PREFIX + BAR_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, null);
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, secondKey, valueCaptor.getValue());
store.read(statusRecord);
return null;
}).when(kafkaBasedLog).send(eq(secondKey), valueCaptor.capture(), any(Callback.class));
byte[] value = store.serializeTopicStatus(firstTopicStatus);
ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, firstKey, value);
store.read(statusRecord);
store.put(secondTopicStatus);
// check capture state
assertEquals(secondTopicStatus, store.parseTopicStatus(valueCaptor.getValue()));
assertEquals(firstTopicStatus, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
assertEquals(secondTopicStatus, store.getTopic(FOO_CONNECTOR, BAR_TOPIC));
assertEquals(new HashSet<>(Arrays.asList(firstTopicStatus, secondTopicStatus)), new HashSet<>(store.getAllTopics(FOO_CONNECTOR)));
}
}