blob: 76002b763711a549910e456c033628332256603d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings("unchecked")
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class KafkaStatusBackingStoreTest {
private static final String STATUS_TOPIC = "status-topic";
private static final String WORKER_ID = "localhost:8083";
private static final String CONNECTOR = "conn";
private static final ConnectorTaskId TASK = new ConnectorTaskId(CONNECTOR, 0);
private KafkaStatusBackingStore store;
private final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
Converter converter = mock(Converter.class);
WorkerConfig workerConfig = mock(WorkerConfig.class);
@Before
public void setup() {
store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, () -> null, kafkaBasedLog);
}
@Test
public void misconfigurationOfStatusBackingStore() {
when(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).thenReturn(null);
when(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).thenReturn(" ");
Exception e = assertThrows(ConfigException.class, () -> store.configure(workerConfig));
assertEquals("Must specify topic for connector status.", e.getMessage());
e = assertThrows(ConfigException.class, () -> store.configure(workerConfig));
assertEquals("Must specify topic for connector status.", e.getMessage());
}
@Test
public void putConnectorState() {
byte[] value = new byte[0];
when(converter.fromConnectData(eq(STATUS_TOPIC), any(Schema.class), any(Struct.class))).thenReturn(value);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, null);
return null;
}).when(kafkaBasedLog).send(eq("status-connector-conn"), eq(value), any(Callback.class));
ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
store.put(status);
// state is not visible until read back from the log
assertNull(store.get(CONNECTOR));
}
@Test
public void putConnectorStateRetriableFailure() {
byte[] value = new byte[0];
when(converter.fromConnectData(eq(STATUS_TOPIC), any(Schema.class), any(Struct.class))).thenReturn(value);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, new TimeoutException());
return null;
}).doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, null);
return null;
})
.when(kafkaBasedLog).send(eq("status-connector-conn"), eq(value), any(Callback.class));
ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
store.put(status);
// state is not visible until read back from the log
assertNull(store.get(CONNECTOR));
}
@Test
public void putConnectorStateNonRetriableFailure() {
byte[] value = new byte[0];
when(converter.fromConnectData(eq(STATUS_TOPIC), any(Schema.class), any(Struct.class))).thenReturn(value);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, new UnknownServerException());
return null;
}).when(kafkaBasedLog).send(eq("status-connector-conn"), eq(value), any(Callback.class));
// the error is logged and ignored
ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
store.put(status);
// state is not visible until read back from the log
assertNull(store.get(CONNECTOR));
}
@Test
public void putSafeConnectorIgnoresStaleStatus() {
byte[] value = new byte[0];
String otherWorkerId = "anotherhost:8083";
// the persisted came from a different host and has a newer generation
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", otherWorkerId);
statusMap.put("state", "RUNNING");
statusMap.put("generation", 1L);
when(converter.toConnectData(STATUS_TOPIC, value)).thenReturn(new SchemaAndValue(null, statusMap));
store.read(consumerRecord(0, "status-connector-conn", value));
store.putSafe(new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0));
verify(kafkaBasedLog, never()).send(anyString(), any(), any(Callback.class));
ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, otherWorkerId, 1);
assertEquals(status, store.get(CONNECTOR));
}
@Test
public void putSafeWithNoPreviousValueIsPropagated() {
final byte[] value = new byte[0];
ArgumentCaptor<Struct> captor = ArgumentCaptor.forClass(Struct.class);
kafkaBasedLog.send(eq("status-connector-" + CONNECTOR), eq(value), any(Callback.class));
final ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.FAILED, WORKER_ID, 0);
store.putSafe(status);
verify(converter).fromConnectData(eq(STATUS_TOPIC), any(Schema.class), captor.capture());
assertEquals(status.state().toString(), captor.getValue().get(KafkaStatusBackingStore.STATE_KEY_NAME));
assertEquals(status.workerId(), captor.getValue().get(KafkaStatusBackingStore.WORKER_ID_KEY_NAME));
assertEquals(status.generation(), captor.getValue().get(KafkaStatusBackingStore.GENERATION_KEY_NAME));
}
@Test
public void putSafeOverridesValueSetBySameWorker() {
final byte[] value = new byte[0];
// the persisted came from the same host, but has a newer generation
Map<String, Object> firstStatusRead = new HashMap<>();
firstStatusRead.put("worker_id", WORKER_ID);
firstStatusRead.put("state", "RUNNING");
firstStatusRead.put("generation", 1L);
Map<String, Object> secondStatusRead = new HashMap<>();
secondStatusRead.put("worker_id", WORKER_ID);
secondStatusRead.put("state", "UNASSIGNED");
secondStatusRead.put("generation", 0L);
when(converter.toConnectData(STATUS_TOPIC, value))
.thenReturn(new SchemaAndValue(null, firstStatusRead))
.thenReturn(new SchemaAndValue(null, secondStatusRead));
when(converter.fromConnectData(eq(STATUS_TOPIC), any(Schema.class), any(Struct.class)))
.thenReturn(value);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, null);
store.read(consumerRecord(1, "status-connector-conn", value));
return null;
}).when(kafkaBasedLog).send(eq("status-connector-conn"), eq(value), any(Callback.class));
store.read(consumerRecord(0, "status-connector-conn", value));
store.putSafe(new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0));
ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0);
assertEquals(status, store.get(CONNECTOR));
}
@Test
public void putConnectorStateShouldOverride() {
final byte[] value = new byte[0];
String otherWorkerId = "anotherhost:8083";
// the persisted came from a different host and has a newer generation
Map<String, Object> firstStatusRead = new HashMap<>();
firstStatusRead.put("worker_id", otherWorkerId);
firstStatusRead.put("state", "RUNNING");
firstStatusRead.put("generation", 1L);
Map<String, Object> secondStatusRead = new HashMap<>();
secondStatusRead.put("worker_id", WORKER_ID);
secondStatusRead.put("state", "UNASSIGNED");
secondStatusRead.put("generation", 0L);
when(converter.toConnectData(STATUS_TOPIC, value))
.thenReturn(new SchemaAndValue(null, firstStatusRead))
.thenReturn(new SchemaAndValue(null, secondStatusRead));
when(converter.fromConnectData(eq(STATUS_TOPIC), any(Schema.class), any(Struct.class)))
.thenReturn(value);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, null);
store.read(consumerRecord(1, "status-connector-conn", value));
return null;
}).when(kafkaBasedLog).send(eq("status-connector-conn"), eq(value), any(Callback.class));
store.read(consumerRecord(0, "status-connector-conn", value));
ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0);
store.put(status);
assertEquals(status, store.get(CONNECTOR));
}
@Test
public void readConnectorState() {
byte[] value = new byte[0];
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
statusMap.put("generation", 0L);
when(converter.toConnectData(STATUS_TOPIC, value))
.thenReturn(new SchemaAndValue(null, statusMap));
store.read(consumerRecord(0, "status-connector-conn", value));
ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
assertEquals(status, store.get(CONNECTOR));
}
@Test
public void putTaskState() {
byte[] value = new byte[0];
when(converter.fromConnectData(eq(STATUS_TOPIC), any(Schema.class), any(Struct.class)))
.thenReturn(value);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, null);
store.read(consumerRecord(1, "status-connector-conn", value));
return null;
}).when(kafkaBasedLog).send(eq("status-task-conn-0"), eq(value), any(Callback.class));
TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
store.put(status);
// state is not visible until read back from the log
assertNull(store.get(TASK));
}
@Test
public void readTaskState() {
byte[] value = new byte[0];
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
statusMap.put("generation", 0L);
when(converter.toConnectData(STATUS_TOPIC, value))
.thenReturn(new SchemaAndValue(null, statusMap));
store.read(consumerRecord(0, "status-task-conn-0", value));
TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
assertEquals(status, store.get(TASK));
}
@Test
public void readTaskStateShouldIgnoreStaleStatusesFromOtherWorkers() {
byte[] value = new byte[0];
String otherWorkerId = "anotherhost:8083";
String yetAnotherWorkerId = "yetanotherhost:8083";
// This worker sends a RUNNING status in the most recent generation
Map<String, Object> firstStatusRead = new HashMap<>();
firstStatusRead.put("worker_id", otherWorkerId);
firstStatusRead.put("state", "RUNNING");
firstStatusRead.put("generation", 10L);
// Another worker still ends up producing an UNASSIGNED status before it could
// read the newer RUNNING status from above belonging to an older generation.
Map<String, Object> secondStatusRead = new HashMap<>();
secondStatusRead.put("worker_id", WORKER_ID);
secondStatusRead.put("state", "UNASSIGNED");
secondStatusRead.put("generation", 9L);
Map<String, Object> thirdStatusRead = new HashMap<>();
thirdStatusRead.put("worker_id", yetAnotherWorkerId);
thirdStatusRead.put("state", "RUNNING");
thirdStatusRead.put("generation", 1L);
when(converter.toConnectData(STATUS_TOPIC, value))
.thenReturn(new SchemaAndValue(null, firstStatusRead))
.thenReturn(new SchemaAndValue(null, secondStatusRead))
.thenReturn(new SchemaAndValue(null, thirdStatusRead));
when(converter.fromConnectData(eq(STATUS_TOPIC), any(Schema.class), any(Struct.class)))
.thenReturn(value);
doAnswer(invocation -> {
((Callback) invocation.getArgument(2)).onCompletion(null, null);
store.read(consumerRecord(2, "status-task-conn-0", value));
return null;
}).when(kafkaBasedLog).send(eq("status-task-conn-0"), eq(value), any(Callback.class));
store.read(consumerRecord(0, "status-task-conn-0", value));
store.read(consumerRecord(1, "status-task-conn-0", value));
// The latest task status should reflect RUNNING status from the newer generation
TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, otherWorkerId, 10);
assertEquals(status, store.get(TASK));
// This status is from the another worker not necessarily belonging to the above group.
// In this case, the status should get updated irrespective of whatever status info was present before.
TaskStatus latestStatus = new TaskStatus(TASK, TaskStatus.State.RUNNING, yetAnotherWorkerId, 1);
store.put(latestStatus);
assertEquals(latestStatus, store.get(TASK));
}
@Test
public void deleteConnectorState() {
final byte[] value = new byte[0];
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
statusMap.put("generation", 0L);
when(converter.fromConnectData(eq(STATUS_TOPIC), any(Schema.class), any(Struct.class))).thenReturn(value);
when(converter.toConnectData(STATUS_TOPIC, value)).thenReturn(new SchemaAndValue(null, statusMap));
ConnectorStatus connectorStatus = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
store.put(connectorStatus);
TaskStatus taskStatus = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
store.put(taskStatus);
store.read(consumerRecord(0, "status-task-conn-0", value));
verify(kafkaBasedLog).send(eq("status-connector-" + CONNECTOR), eq(value), any(Callback.class));
verify(kafkaBasedLog).send(eq("status-task-conn-0"), eq(value), any(Callback.class));
assertEquals(new HashSet<>(Collections.singletonList(CONNECTOR)), store.connectors());
assertEquals(new HashSet<>(Collections.singletonList(taskStatus)), new HashSet<>(store.getAll(CONNECTOR)));
store.read(consumerRecord(0, "status-connector-conn", null));
assertTrue(store.connectors().isEmpty());
assertTrue(store.getAll(CONNECTOR).isEmpty());
}
@Test
public void deleteTaskState() {
final byte[] value = new byte[0];
Map<String, Object> statusMap = new HashMap<>();
statusMap.put("worker_id", WORKER_ID);
statusMap.put("state", "RUNNING");
statusMap.put("generation", 0L);
when(converter.fromConnectData(eq(STATUS_TOPIC), any(Schema.class), any(Struct.class))).thenReturn(value);
when(converter.toConnectData(STATUS_TOPIC, value)).thenReturn(new SchemaAndValue(null, statusMap));
TaskStatus taskStatus = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
store.put(taskStatus);
store.read(consumerRecord(0, "status-task-conn-0", value));
verify(kafkaBasedLog).send(eq("status-task-conn-0"), eq(value), any(Callback.class));
assertEquals(new HashSet<>(Collections.singletonList(taskStatus)), new HashSet<>(store.getAll(CONNECTOR)));
store.read(consumerRecord(0, "status-task-conn-0", null));
assertTrue(store.getAll(CONNECTOR).isEmpty());
}
@Test
public void testClientIds() {
String clientIdBase = "test-client-id-";
Supplier<TopicAdmin> topicAdminSupplier = () -> mock(TopicAdmin.class);
ArgumentCaptor<Map<String, Object>> capturedProducerProps = ArgumentCaptor.forClass(Map.class);
ArgumentCaptor<Map<String, Object>> capturedConsumerProps = ArgumentCaptor.forClass(Map.class);
store = spy(new KafkaStatusBackingStore(new MockTime(), converter, topicAdminSupplier, clientIdBase));
KafkaBasedLog<String, byte[]> kafkaLog = mock(KafkaBasedLog.class);
doReturn(kafkaLog).when(store).createKafkaBasedLog(any(), capturedProducerProps.capture(),
capturedConsumerProps.capture(), any(),
any(), any(), any(), any());
when(workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG)).thenReturn("connect-statuses");
store.configure(workerConfig);
final String expectedClientId = clientIdBase + "statuses";
assertEquals(expectedClientId, capturedProducerProps.getValue().get(CLIENT_ID_CONFIG));
assertEquals(expectedClientId, capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
}
private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {
return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
TimestampType.CREATE_TIME, 0, 0, key, value, new RecordHeaders(), Optional.empty());
}
}