blob: 9ced632b9067643b46a8a90fc56bd112e7ed0224 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class WorkerConnectorTest {
private static final String VERSION = "1.1";
public static final String CONNECTOR = "connector";
public static final Map<String, String> CONFIG = new HashMap<>();
static {
CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "my-topic");
}
public ConnectorConfig connectorConfig;
public MockConnectMetrics metrics;
@Rule
public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Mock private Plugins plugins;
@Mock private CloseableConnectorContext ctx;
@Mock private ConnectorStatus.Listener listener;
@Mock private ClassLoader classLoader;
private final ConnectorType connectorType;
private final Connector connector;
private final CloseableOffsetStorageReader offsetStorageReader;
private final ConnectorOffsetBackingStore offsetStore;
@Parameterized.Parameters
public static Collection<ConnectorType> parameters() {
return Arrays.asList(ConnectorType.SOURCE, ConnectorType.SINK);
}
public WorkerConnectorTest(ConnectorType connectorType) {
this.connectorType = connectorType;
switch (connectorType) {
case SINK:
this.connector = mock(SinkConnector.class);
this.offsetStorageReader = null;
this.offsetStore = null;
break;
case SOURCE:
this.connector = mock(SourceConnector.class);
this.offsetStorageReader = mock(CloseableOffsetStorageReader.class);
this.offsetStore = mock(ConnectorOffsetBackingStore.class);
break;
default:
throw new IllegalStateException("Unexpected connector type: " + connectorType);
}
}
@Before
public void setup() {
connectorConfig = new ConnectorConfig(plugins, CONFIG);
metrics = new MockConnectMetrics();
}
@After
public void tearDown() {
if (metrics != null) metrics.stop();
}
@Test
public void testInitializeFailure() {
RuntimeException exception = new RuntimeException();
when(connector.version()).thenReturn(VERSION);
doThrow(exception).when(connector).initialize(any());
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertFailedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(listener).onFailure(CONNECTOR, exception);
verifyCleanShutdown(false);
}
@Test
public void testFailureIsFinalState() {
RuntimeException exception = new RuntimeException();
when(connector.version()).thenReturn(VERSION);
doThrow(exception).when(connector).initialize(any());
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertFailedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertFailedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(listener).onFailure(CONNECTOR, exception);
// expect no call to onStartup() after failure
verifyCleanShutdown(false);
verify(onStateChange).onCompletion(any(Exception.class), isNull());
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStartupAndShutdown() {
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verifyCleanShutdown(true);
verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStartupAndPause() {
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
assertPausedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onPause(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStartupAndStop() {
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
assertStoppedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onStop(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STOPPED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testOnResume() {
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
assertPausedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(listener).onPause(CONNECTOR);
verify(connector).start(CONFIG);
verify(listener).onResume(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED));
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStartupPaused() {
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
assertPausedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
// connector never gets started
verify(listener).onPause(CONNECTOR);
verifyCleanShutdown(false);
verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStartupStopped() {
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
assertStoppedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
// connector never gets started
verify(listener).onStop(CONNECTOR);
verifyCleanShutdown(false);
verify(onStateChange).onCompletion(isNull(), eq(TargetState.STOPPED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStartupFailure() {
RuntimeException exception = new RuntimeException();
when(connector.version()).thenReturn(VERSION);
doThrow(exception).when(connector).start(CONFIG);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertFailedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onFailure(CONNECTOR, exception);
verifyCleanShutdown(false);
verify(onStateChange).onCompletion(any(Exception.class), isNull());
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testStopFailure() {
RuntimeException exception = new RuntimeException();
when(connector.version()).thenReturn(VERSION);
// Fail during the first call to stop, then succeed for the next attempt
doThrow(exception).doNothing().when(connector).stop();
Callback<TargetState> onFirstStateChange = mockCallback();
Callback<TargetState> onSecondStateChange = mockCallback();
Callback<TargetState> onThirdStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onFirstStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onSecondStateChange);
assertStoppedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onThirdStateChange);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector, times(2)).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onResume(CONNECTOR);
verify(listener).onStop(CONNECTOR);
verify(onFirstStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onFirstStateChange);
// We swallow failures when transitioning to the STOPPED state
verify(onSecondStateChange).onCompletion(isNull(), eq(TargetState.STOPPED));
verifyNoMoreInteractions(onSecondStateChange);
verify(onThirdStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onThirdStateChange);
verifyShutdown(2, true, true);
verifyNoMoreInteractions(listener);
}
@Test
public void testShutdownFailure() {
RuntimeException exception = new RuntimeException();
when(connector.version()).thenReturn(VERSION);
doThrow(exception).when(connector).stop();
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertFailedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onStateChange);
verify(listener).onFailure(CONNECTOR, exception);
verifyShutdown(false, true);
}
@Test
public void testTransitionStartedToStarted() {
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
// expect only one call to onStartup()
verify(listener).onStartup(CONNECTOR);
verifyCleanShutdown(true);
verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STARTED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testTransitionPausedToPaused() {
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
assertPausedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
assertPausedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onPause(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
inOrder.verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.PAUSED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testTransitionStoppedToStopped() {
when(connector.version()).thenReturn(VERSION);
Callback<TargetState> onStateChange = mockCallback();
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
assertInitializedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
assertRunningMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
assertStoppedMetric(workerConnector);
workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
assertStoppedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
verify(listener).onStartup(CONNECTOR);
verify(listener).onStop(CONNECTOR);
verifyCleanShutdown(true);
InOrder inOrder = inOrder(onStateChange);
inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
inOrder.verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STOPPED));
verifyNoMoreInteractions(onStateChange);
}
@Test
public void testFailConnectorThatIsNeitherSourceNorSink() {
Connector badConnector = mock(Connector.class);
when(badConnector.version()).thenReturn(VERSION);
WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, badConnector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
workerConnector.initialize();
verify(badConnector).version();
ArgumentCaptor<Throwable> exceptionCapture = ArgumentCaptor.forClass(Throwable.class);
verify(listener).onFailure(eq(CONNECTOR), exceptionCapture.capture());
Throwable e = exceptionCapture.getValue();
assertTrue(e instanceof ConnectException);
assertTrue(e.getMessage().contains("must be a subclass of"));
}
protected void assertFailedMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertTrue(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
}
protected void assertStoppedMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isPaused());
assertTrue(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isRunning());
}
protected void assertPausedMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertTrue(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
}
protected void assertRunningMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertTrue(workerConnector.metrics().isRunning());
}
protected void assertDestroyedMetric(WorkerConnector workerConnector) {
assertTrue(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
}
protected void assertInitializedMetric(WorkerConnector workerConnector) {
String expectedType;
switch (connectorType) {
case SINK:
expectedType = "sink";
break;
case SOURCE:
expectedType = "source";
break;
default:
throw new IllegalStateException("Unexpected connector type: " + connectorType);
}
assertInitializedMetric(workerConnector, expectedType);
}
protected void assertInitializedMetric(WorkerConnector workerConnector, String expectedType) {
assertTrue(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
MetricGroup metricGroup = workerConnector.metrics().metricGroup();
String status = metrics.currentMetricValueAsString(metricGroup, "status");
String type = metrics.currentMetricValueAsString(metricGroup, "connector-type");
String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class");
String version = metrics.currentMetricValueAsString(metricGroup, "connector-version");
assertEquals(expectedType, type);
assertNotNull(clazz);
assertEquals(VERSION, version);
}
@SuppressWarnings("unchecked")
private Callback<TargetState> mockCallback() {
return mock(Callback.class);
}
private void verifyInitialize() {
verify(connector).version();
if (connectorType == ConnectorType.SOURCE) {
verify(offsetStore).start();
verify(connector).initialize(any(SourceConnectorContext.class));
} else if (connectorType == ConnectorType.SINK) {
verify(connector).initialize(any(SinkConnectorContext.class));
}
}
private void verifyCleanShutdown(boolean started) {
verifyShutdown(true, started);
}
private void verifyShutdown(boolean clean, boolean started) {
verifyShutdown(1, clean, started);
}
private void verifyShutdown(int connectorStops, boolean clean, boolean started) {
verify(ctx).close();
if (connectorType == ConnectorType.SOURCE) {
verify(offsetStorageReader).close();
verify(offsetStore).stop();
}
if (clean) {
verify(listener).onShutdown(CONNECTOR);
}
if (started) {
verify(connector, times(connectorStops)).stop();
}
}
private static abstract class TestConnector extends Connector {
}
}