| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.connect.runtime; |
| |
| import org.apache.kafka.clients.CommonClientConfigs; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.clients.producer.KafkaProducer; |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| import org.apache.kafka.common.Configurable; |
| import org.apache.kafka.common.config.AbstractConfig; |
| import org.apache.kafka.common.config.ConfigDef; |
| import org.apache.kafka.common.config.ConfigException; |
| import org.apache.kafka.common.utils.MockTime; |
| import org.apache.kafka.common.utils.Time; |
| import org.apache.kafka.connect.connector.Connector; |
| import org.apache.kafka.connect.connector.ConnectorContext; |
| import org.apache.kafka.connect.connector.Task; |
| import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; |
| import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; |
| import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; |
| import org.apache.kafka.connect.data.Schema; |
| import org.apache.kafka.connect.data.SchemaAndValue; |
| import org.apache.kafka.connect.errors.ConnectException; |
| import org.apache.kafka.connect.json.JsonConverter; |
| import org.apache.kafka.connect.json.JsonConverterConfig; |
| import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; |
| import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter; |
| import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; |
| import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; |
| import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; |
| import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; |
| import org.apache.kafka.connect.runtime.isolation.Plugins; |
| import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; |
| import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; |
| import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; |
| import org.apache.kafka.connect.sink.SinkTask; |
| import org.apache.kafka.connect.source.SourceRecord; |
| import org.apache.kafka.connect.source.SourceTask; |
| import org.apache.kafka.connect.storage.Converter; |
| import org.apache.kafka.connect.storage.HeaderConverter; |
| import org.apache.kafka.connect.storage.OffsetBackingStore; |
| import org.apache.kafka.connect.storage.OffsetStorageReader; |
| import org.apache.kafka.connect.storage.OffsetStorageWriter; |
| import org.apache.kafka.connect.util.ConnectorTaskId; |
| import org.apache.kafka.connect.util.ThreadedTest; |
| import org.easymock.Capture; |
| import org.easymock.EasyMock; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.powermock.api.easymock.PowerMock; |
| import org.powermock.api.easymock.annotation.Mock; |
| import org.powermock.api.easymock.annotation.MockNice; |
| import org.powermock.api.easymock.annotation.MockStrict; |
| import org.powermock.core.classloader.annotations.PowerMockIgnore; |
| import org.powermock.core.classloader.annotations.PrepareForTest; |
| import org.powermock.modules.junit4.PowerMockRunner; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutorService; |
| |
| import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR; |
| import static org.easymock.EasyMock.anyObject; |
| import static org.easymock.EasyMock.eq; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.fail; |
| |
| @RunWith(PowerMockRunner.class) |
| @PrepareForTest({Worker.class, Plugins.class}) |
| @PowerMockIgnore("javax.management.*") |
| public class WorkerTest extends ThreadedTest { |
| |
| private static final String CONNECTOR_ID = "test-connector"; |
| private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0); |
| private static final String WORKER_ID = "localhost:8083"; |
| private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); |
| private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy(); |
| |
| private Map<String, String> workerProps = new HashMap<>(); |
| private WorkerConfig config; |
| private Worker worker; |
| |
| private Map<String, String> defaultProducerConfigs = new HashMap<>(); |
| private Map<String, String> defaultConsumerConfigs = new HashMap<>(); |
| |
| @Mock |
| private Plugins plugins; |
| @Mock |
| private PluginClassLoader pluginLoader; |
| @Mock |
| private DelegatingClassLoader delegatingLoader; |
| @Mock |
| private OffsetBackingStore offsetBackingStore; |
| @MockStrict |
| private TaskStatus.Listener taskStatusListener; |
| @MockStrict |
| private ConnectorStatus.Listener connectorStatusListener; |
| |
| @Mock private Herder herder; |
| @Mock private Connector connector; |
| @Mock private ConnectorContext ctx; |
| @Mock private TestSourceTask task; |
| @Mock private WorkerSourceTask workerTask; |
| @Mock private Converter keyConverter; |
| @Mock private Converter valueConverter; |
| @Mock private Converter taskKeyConverter; |
| @Mock private Converter taskValueConverter; |
| @Mock private HeaderConverter taskHeaderConverter; |
| @Mock private ExecutorService executorService; |
| @MockNice private ConnectorConfig connectorConfig; |
| |
| @Before |
| public void setup() { |
| super.setup(); |
| workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); |
| workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); |
| workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); |
| workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); |
| workerProps.put("internal.key.converter.schemas.enable", "false"); |
| workerProps.put("internal.value.converter.schemas.enable", "false"); |
| workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); |
| workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); |
| config = new StandaloneConfig(workerProps); |
| |
| defaultProducerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); |
| defaultProducerConfigs.put( |
| ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); |
| defaultProducerConfigs.put( |
| ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); |
| defaultProducerConfigs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); |
| defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE)); |
| defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all"); |
| defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); |
| defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); |
| |
| defaultConsumerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); |
| defaultConsumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); |
| defaultConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| defaultConsumerConfigs |
| .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); |
| defaultConsumerConfigs |
| .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); |
| |
| PowerMock.mockStatic(Plugins.class); |
| } |
| |
| @Test |
| public void testStartAndStopConnector() { |
| expectConverters(); |
| expectStartStorage(); |
| |
| // Create |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); |
| EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName())) |
| .andReturn(connector); |
| EasyMock.expect(connector.version()).andReturn("1.0"); |
| |
| Map<String, String> props = new HashMap<>(); |
| props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); |
| props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); |
| props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); |
| props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); |
| |
| EasyMock.expect(connector.version()).andReturn("1.0"); |
| |
| EasyMock.expect(plugins.compareAndSwapLoaders(connector)) |
| .andReturn(delegatingLoader) |
| .times(2); |
| connector.initialize(anyObject(ConnectorContext.class)); |
| EasyMock.expectLastCall(); |
| connector.start(props); |
| EasyMock.expectLastCall(); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) |
| .andReturn(pluginLoader).times(2); |
| |
| connectorStatusListener.onStartup(CONNECTOR_ID); |
| EasyMock.expectLastCall(); |
| |
| // Remove |
| connector.stop(); |
| EasyMock.expectLastCall(); |
| |
| connectorStatusListener.onShutdown(CONNECTOR_ID); |
| EasyMock.expectLastCall(); |
| |
| expectStopStorage(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| |
| assertEquals(Collections.emptySet(), worker.connectorNames()); |
| worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); |
| assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); |
| try { |
| worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); |
| fail("Should have thrown exception when trying to add connector with same name."); |
| } catch (ConnectException e) { |
| // expected |
| } |
| assertStatistics(worker, 1, 0); |
| assertStartupStatistics(worker, 1, 0, 0, 0); |
| worker.stopConnector(CONNECTOR_ID); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 1, 0, 0, 0); |
| assertEquals(Collections.emptySet(), worker.connectorNames()); |
| // Nothing should be left, so this should effectively be a nop |
| worker.stop(); |
| assertStatistics(worker, 0, 0); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testStartConnectorFailure() { |
| expectConverters(); |
| expectStartStorage(); |
| |
| Map<String, String> props = new HashMap<>(); |
| props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); |
| props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); |
| props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); |
| props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "java.util.HashMap"); // Bad connector class name |
| |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader); |
| EasyMock.expect(plugins.newConnector(EasyMock.anyString())) |
| .andThrow(new ConnectException("Failed to find Connector")); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) |
| .andReturn(pluginLoader); |
| |
| connectorStatusListener.onFailure( |
| EasyMock.eq(CONNECTOR_ID), |
| EasyMock.<ConnectException>anyObject() |
| ); |
| EasyMock.expectLastCall(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| |
| assertStatistics(worker, 0, 0); |
| assertFalse(worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED)); |
| |
| assertStartupStatistics(worker, 1, 1, 0, 0); |
| assertEquals(Collections.emptySet(), worker.connectorNames()); |
| |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 1, 1, 0, 0); |
| assertFalse(worker.stopConnector(CONNECTOR_ID)); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 1, 1, 0, 0); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testAddConnectorByAlias() { |
| expectConverters(); |
| expectStartStorage(); |
| |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); |
| EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector); |
| EasyMock.expect(connector.version()).andReturn("1.0"); |
| |
| Map<String, String> props = new HashMap<>(); |
| props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); |
| props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); |
| props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); |
| props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector"); |
| |
| EasyMock.expect(connector.version()).andReturn("1.0"); |
| EasyMock.expect(plugins.compareAndSwapLoaders(connector)) |
| .andReturn(delegatingLoader) |
| .times(2); |
| connector.initialize(anyObject(ConnectorContext.class)); |
| EasyMock.expectLastCall(); |
| connector.start(props); |
| EasyMock.expectLastCall(); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) |
| .andReturn(pluginLoader) |
| .times(2); |
| |
| connectorStatusListener.onStartup(CONNECTOR_ID); |
| EasyMock.expectLastCall(); |
| |
| // Remove |
| connector.stop(); |
| EasyMock.expectLastCall(); |
| |
| connectorStatusListener.onShutdown(CONNECTOR_ID); |
| EasyMock.expectLastCall(); |
| |
| expectStopStorage(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| |
| assertStatistics(worker, 0, 0); |
| assertEquals(Collections.emptySet(), worker.connectorNames()); |
| worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); |
| assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); |
| assertStatistics(worker, 1, 0); |
| assertStartupStatistics(worker, 1, 0, 0, 0); |
| |
| worker.stopConnector(CONNECTOR_ID); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 1, 0, 0, 0); |
| assertEquals(Collections.emptySet(), worker.connectorNames()); |
| // Nothing should be left, so this should effectively be a nop |
| worker.stop(); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 1, 0, 0, 0); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testAddConnectorByShortAlias() { |
| expectConverters(); |
| expectStartStorage(); |
| |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); |
| EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector); |
| EasyMock.expect(connector.version()).andReturn("1.0"); |
| |
| Map<String, String> props = new HashMap<>(); |
| props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); |
| props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); |
| props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); |
| props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest"); |
| |
| EasyMock.expect(connector.version()).andReturn("1.0"); |
| EasyMock.expect(plugins.compareAndSwapLoaders(connector)) |
| .andReturn(delegatingLoader) |
| .times(2); |
| connector.initialize(anyObject(ConnectorContext.class)); |
| EasyMock.expectLastCall(); |
| connector.start(props); |
| EasyMock.expectLastCall(); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) |
| .andReturn(pluginLoader) |
| .times(2); |
| |
| connectorStatusListener.onStartup(CONNECTOR_ID); |
| EasyMock.expectLastCall(); |
| |
| // Remove |
| connector.stop(); |
| EasyMock.expectLastCall(); |
| |
| connectorStatusListener.onShutdown(CONNECTOR_ID); |
| EasyMock.expectLastCall(); |
| |
| expectStopStorage(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| |
| assertStatistics(worker, 0, 0); |
| assertEquals(Collections.emptySet(), worker.connectorNames()); |
| worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); |
| assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); |
| assertStatistics(worker, 1, 0); |
| |
| worker.stopConnector(CONNECTOR_ID); |
| assertStatistics(worker, 0, 0); |
| assertEquals(Collections.emptySet(), worker.connectorNames()); |
| // Nothing should be left, so this should effectively be a nop |
| worker.stop(); |
| assertStatistics(worker, 0, 0); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testStopInvalidConnector() { |
| expectConverters(); |
| expectStartStorage(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| |
| worker.stopConnector(CONNECTOR_ID); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testReconfigureConnectorTasks() { |
| expectConverters(); |
| expectStartStorage(); |
| |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3); |
| EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName())) |
| .andReturn(connector); |
| EasyMock.expect(connector.version()).andReturn("1.0"); |
| |
| Map<String, String> props = new HashMap<>(); |
| props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); |
| props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); |
| props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); |
| props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); |
| |
| EasyMock.expect(connector.version()).andReturn("1.0"); |
| EasyMock.expect(plugins.compareAndSwapLoaders(connector)) |
| .andReturn(delegatingLoader) |
| .times(3); |
| connector.initialize(anyObject(ConnectorContext.class)); |
| EasyMock.expectLastCall(); |
| connector.start(props); |
| EasyMock.expectLastCall(); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) |
| .andReturn(pluginLoader) |
| .times(3); |
| |
| connectorStatusListener.onStartup(CONNECTOR_ID); |
| EasyMock.expectLastCall(); |
| |
| // Reconfigure |
| EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class); |
| Map<String, String> taskProps = new HashMap<>(); |
| taskProps.put("foo", "bar"); |
| EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps)); |
| |
| // Remove |
| connector.stop(); |
| EasyMock.expectLastCall(); |
| |
| connectorStatusListener.onShutdown(CONNECTOR_ID); |
| EasyMock.expectLastCall(); |
| |
| expectStopStorage(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| |
| assertStatistics(worker, 0, 0); |
| assertEquals(Collections.emptySet(), worker.connectorNames()); |
| worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); |
| assertStatistics(worker, 1, 0); |
| assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); |
| try { |
| worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED); |
| fail("Should have thrown exception when trying to add connector with same name."); |
| } catch (ConnectException e) { |
| // expected |
| } |
| Map<String, String> connProps = new HashMap<>(props); |
| connProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "2"); |
| ConnectorConfig connConfig = new SinkConnectorConfig(plugins, connProps); |
| List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, connConfig); |
| Map<String, String> expectedTaskProps = new HashMap<>(); |
| expectedTaskProps.put("foo", "bar"); |
| expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); |
| expectedTaskProps.put(SinkTask.TOPICS_CONFIG, "foo,bar"); |
| assertEquals(2, taskConfigs.size()); |
| assertEquals(expectedTaskProps, taskConfigs.get(0)); |
| assertEquals(expectedTaskProps, taskConfigs.get(1)); |
| assertStatistics(worker, 1, 0); |
| assertStartupStatistics(worker, 1, 0, 0, 0); |
| worker.stopConnector(CONNECTOR_ID); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 1, 0, 0, 0); |
| assertEquals(Collections.emptySet(), worker.connectorNames()); |
| // Nothing should be left, so this should effectively be a nop |
| worker.stop(); |
| assertStatistics(worker, 0, 0); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| |
| @Test |
| public void testAddRemoveTask() throws Exception { |
| expectConverters(); |
| expectStartStorage(); |
| |
| EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); |
| |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); |
| PowerMock.expectNew( |
| WorkerSourceTask.class, EasyMock.eq(TASK_ID), |
| EasyMock.eq(task), |
| anyObject(TaskStatus.Listener.class), |
| EasyMock.eq(TargetState.STARTED), |
| anyObject(JsonConverter.class), |
| anyObject(JsonConverter.class), |
| anyObject(JsonConverter.class), |
| EasyMock.eq(new TransformationChain<>(Collections.emptyList(), NOOP_OPERATOR)), |
| anyObject(KafkaProducer.class), |
| anyObject(OffsetStorageReader.class), |
| anyObject(OffsetStorageWriter.class), |
| EasyMock.eq(config), |
| anyObject(ClusterConfigState.class), |
| anyObject(ConnectMetrics.class), |
| anyObject(ClassLoader.class), |
| anyObject(Time.class), |
| anyObject(RetryWithToleranceOperator.class)) |
| .andReturn(workerTask); |
| Map<String, String> origProps = new HashMap<>(); |
| origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); |
| |
| TaskConfig taskConfig = new TaskConfig(origProps); |
| // We should expect this call, but the pluginLoader being swapped in is only mocked. |
| // EasyMock.expect(pluginLoader.loadClass(TestSourceTask.class.getName())) |
| // .andReturn((Class) TestSourceTask.class); |
| EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task); |
| EasyMock.expect(task.version()).andReturn("1.0"); |
| |
| workerTask.initialize(taskConfig); |
| EasyMock.expectLastCall(); |
| |
| // Expect that the worker will create converters and will find them using the current classloader ... |
| assertNotNull(taskKeyConverter); |
| assertNotNull(taskValueConverter); |
| assertNotNull(taskHeaderConverter); |
| expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskKeyConverter); |
| expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskValueConverter); |
| expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); |
| |
| EasyMock.expect(executorService.submit(workerTask)).andReturn(null); |
| |
| EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); |
| EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) |
| .andReturn(pluginLoader); |
| EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader) |
| .times(2); |
| |
| EasyMock.expect(workerTask.loader()).andReturn(pluginLoader); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader) |
| .times(2); |
| plugins.connectorClass(WorkerTestConnector.class.getName()); |
| EasyMock.expectLastCall().andReturn(WorkerTestConnector.class); |
| // Remove |
| workerTask.stop(); |
| EasyMock.expectLastCall(); |
| EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); |
| EasyMock.expectLastCall(); |
| |
| expectStopStorage(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, |
| noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 0, 0, 0, 0); |
| assertEquals(Collections.emptySet(), worker.taskIds()); |
| worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); |
| assertStatistics(worker, 0, 1); |
| assertStartupStatistics(worker, 0, 0, 1, 0); |
| assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds()); |
| worker.stopAndAwaitTask(TASK_ID); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 0, 0, 1, 0); |
| assertEquals(Collections.emptySet(), worker.taskIds()); |
| // Nothing should be left, so this should effectively be a nop |
| worker.stop(); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 0, 0, 1, 0); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testTaskStatusMetricsStatuses() throws Exception { |
| expectConverters(); |
| expectStartStorage(); |
| |
| EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); |
| |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); |
| PowerMock.expectNew(WorkerSourceTask.class, EasyMock.eq(TASK_ID), |
| EasyMock.eq(task), |
| anyObject(TaskStatus.Listener.class), |
| EasyMock.eq(TargetState.STARTED), |
| anyObject(JsonConverter.class), |
| anyObject(JsonConverter.class), |
| anyObject(JsonConverter.class), |
| EasyMock.eq(new TransformationChain<>(Collections.emptyList(), NOOP_OPERATOR)), |
| anyObject(KafkaProducer.class), |
| anyObject(OffsetStorageReader.class), |
| anyObject(OffsetStorageWriter.class), |
| EasyMock.eq(config), |
| anyObject(ClusterConfigState.class), |
| anyObject(ConnectMetrics.class), |
| anyObject(ClassLoader.class), |
| anyObject(Time.class), |
| anyObject(RetryWithToleranceOperator.class)).andReturn(workerTask); |
| Map<String, String> origProps = new HashMap<>(); |
| origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); |
| |
| TaskConfig taskConfig = new TaskConfig(origProps); |
| // We should expect this call, but the pluginLoader being swapped in is only mocked. |
| // EasyMock.expect(pluginLoader.loadClass(TestSourceTask.class.getName())) |
| // .andReturn((Class) TestSourceTask.class); |
| EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task); |
| EasyMock.expect(task.version()).andReturn("1.0"); |
| |
| workerTask.initialize(taskConfig); |
| EasyMock.expectLastCall(); |
| |
| // Expect that the worker will create converters and will find them using the current classloader ... |
| assertNotNull(taskKeyConverter); |
| assertNotNull(taskValueConverter); |
| assertNotNull(taskHeaderConverter); |
| expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskKeyConverter); |
| expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskValueConverter); |
| expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); |
| |
| EasyMock.expect(executorService.submit(workerTask)).andReturn(null); |
| |
| EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); |
| EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) |
| .andReturn(pluginLoader); |
| EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader) |
| .times(2); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader) |
| .times(2); |
| plugins.connectorClass(WorkerTestConnector.class.getName()); |
| EasyMock.expectLastCall().andReturn(WorkerTestConnector.class); |
| |
| EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); |
| EasyMock.expectLastCall(); |
| |
| // Each time we check the task metrics, the worker will call the herder |
| herder.taskStatus(TASK_ID); |
| EasyMock.expectLastCall() |
| .andReturn(new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "msg")); |
| |
| herder.taskStatus(TASK_ID); |
| EasyMock.expectLastCall() |
| .andReturn(new ConnectorStateInfo.TaskState(0, "PAUSED", "worker", "msg")); |
| |
| herder.taskStatus(TASK_ID); |
| EasyMock.expectLastCall() |
| .andReturn(new ConnectorStateInfo.TaskState(0, "FAILED", "worker", "msg")); |
| |
| herder.taskStatus(TASK_ID); |
| EasyMock.expectLastCall() |
| .andReturn(new ConnectorStateInfo.TaskState(0, "DESTROYED", "worker", "msg")); |
| |
| herder.taskStatus(TASK_ID); |
| EasyMock.expectLastCall() |
| .andReturn(new ConnectorStateInfo.TaskState(0, "UNASSIGNED", "worker", "msg")); |
| |
| // Called when we stop the worker |
| EasyMock.expect(workerTask.loader()).andReturn(pluginLoader); |
| workerTask.stop(); |
| EasyMock.expectLastCall(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, |
| new MockTime(), |
| plugins, |
| config, |
| offsetBackingStore, |
| executorService, |
| noneConnectorClientConfigOverridePolicy); |
| |
| worker.herder = herder; |
| |
| worker.start(); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 0, 0, 0, 0); |
| assertEquals(Collections.emptySet(), worker.taskIds()); |
| worker.startTask( |
| TASK_ID, |
| ClusterConfigState.EMPTY, |
| anyConnectorConfigMap(), |
| origProps, |
| taskStatusListener, |
| TargetState.STARTED); |
| |
| assertStatusMetrics(1L, "connector-running-task-count"); |
| assertStatusMetrics(1L, "connector-paused-task-count"); |
| assertStatusMetrics(1L, "connector-failed-task-count"); |
| assertStatusMetrics(1L, "connector-destroyed-task-count"); |
| assertStatusMetrics(1L, "connector-unassigned-task-count"); |
| |
| worker.stopAndAwaitTask(TASK_ID); |
| assertStatusMetrics(0L, "connector-running-task-count"); |
| assertStatusMetrics(0L, "connector-paused-task-count"); |
| assertStatusMetrics(0L, "connector-failed-task-count"); |
| assertStatusMetrics(0L, "connector-destroyed-task-count"); |
| assertStatusMetrics(0L, "connector-unassigned-task-count"); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testConnectorStatusMetricsGroup_taskStatusCounter() { |
| ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>(); |
| tasks.put(new ConnectorTaskId("c1", 0), workerTask); |
| tasks.put(new ConnectorTaskId("c1", 1), workerTask); |
| tasks.put(new ConnectorTaskId("c2", 0), workerTask); |
| |
| |
| expectConverters(); |
| |
| expectStartStorage(); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader); |
| EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); |
| |
| taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject()); |
| EasyMock.expectLastCall(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, |
| new MockTime(), |
| plugins, |
| config, |
| offsetBackingStore, |
| noneConnectorClientConfigOverridePolicy); |
| |
| Worker.ConnectorStatusMetricsGroup metricGroup = new Worker.ConnectorStatusMetricsGroup( |
| worker.metrics(), tasks, herder |
| ); |
| assertEquals(2L, (long) metricGroup.taskCounter("c1").metricValue(0L)); |
| assertEquals(1L, (long) metricGroup.taskCounter("c2").metricValue(0L)); |
| assertEquals(0L, (long) metricGroup.taskCounter("fakeConnector").metricValue(0L)); |
| } |
| |
| @Test |
| public void testStartTaskFailure() { |
| expectConverters(); |
| expectStartStorage(); |
| |
| Map<String, String> origProps = new HashMap<>(); |
| origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath"); |
| |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader); |
| EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); |
| EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) |
| .andReturn(pluginLoader); |
| |
| // We would normally expect this since the plugin loader would have been swapped in. However, since we mock out |
| // all classloader changes, the call actually goes to the normal default classloader. However, this works out |
| // fine since we just wanted a ClassNotFoundException anyway. |
| // EasyMock.expect(pluginLoader.loadClass(origProps.get(TaskConfig.TASK_CLASS_CONFIG))) |
| // .andThrow(new ClassNotFoundException()); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)) |
| .andReturn(delegatingLoader); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)) |
| .andReturn(pluginLoader); |
| |
| taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject()); |
| EasyMock.expectLastCall(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 0, 0, 0, 0); |
| |
| assertFalse(worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED)); |
| assertStartupStatistics(worker, 0, 0, 1, 1); |
| |
| assertStatistics(worker, 0, 0); |
| assertStartupStatistics(worker, 0, 0, 1, 1); |
| assertEquals(Collections.emptySet(), worker.taskIds()); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testCleanupTasksOnStop() throws Exception { |
| expectConverters(); |
| expectStartStorage(); |
| |
| EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); |
| |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); |
| PowerMock.expectNew( |
| WorkerSourceTask.class, EasyMock.eq(TASK_ID), |
| EasyMock.eq(task), |
| anyObject(TaskStatus.Listener.class), |
| EasyMock.eq(TargetState.STARTED), |
| anyObject(JsonConverter.class), |
| anyObject(JsonConverter.class), |
| anyObject(JsonConverter.class), |
| EasyMock.eq(new TransformationChain<>(Collections.emptyList(), NOOP_OPERATOR)), |
| anyObject(KafkaProducer.class), |
| anyObject(OffsetStorageReader.class), |
| anyObject(OffsetStorageWriter.class), |
| anyObject(WorkerConfig.class), |
| anyObject(ClusterConfigState.class), |
| anyObject(ConnectMetrics.class), |
| EasyMock.eq(pluginLoader), |
| anyObject(Time.class), |
| anyObject(RetryWithToleranceOperator.class)) |
| .andReturn(workerTask); |
| Map<String, String> origProps = new HashMap<>(); |
| origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); |
| |
| TaskConfig taskConfig = new TaskConfig(origProps); |
| // We should expect this call, but the pluginLoader being swapped in is only mocked. |
| // EasyMock.expect(pluginLoader.loadClass(TestSourceTask.class.getName())) |
| // .andReturn((Class) TestSourceTask.class); |
| EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task); |
| EasyMock.expect(task.version()).andReturn("1.0"); |
| |
| workerTask.initialize(taskConfig); |
| EasyMock.expectLastCall(); |
| |
| // Expect that the worker will create converters and will not initially find them using the current classloader ... |
| assertNotNull(taskKeyConverter); |
| assertNotNull(taskValueConverter); |
| assertNotNull(taskHeaderConverter); |
| expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null); |
| expectTaskKeyConverters(ClassLoaderUsage.PLUGINS, taskKeyConverter); |
| expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null); |
| expectTaskValueConverters(ClassLoaderUsage.PLUGINS, taskValueConverter); |
| expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); |
| expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); |
| |
| EasyMock.expect(executorService.submit(workerTask)).andReturn(null); |
| |
| EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); |
| EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) |
| .andReturn(pluginLoader); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader) |
| .times(2); |
| |
| EasyMock.expect(workerTask.loader()).andReturn(pluginLoader); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader) |
| .times(2); |
| plugins.connectorClass(WorkerTestConnector.class.getName()); |
| EasyMock.expectLastCall().andReturn(WorkerTestConnector.class); |
| // Remove on Worker.stop() |
| workerTask.stop(); |
| EasyMock.expectLastCall(); |
| |
| EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true); |
| // Note that in this case we *do not* commit offsets since it's an unclean shutdown |
| EasyMock.expectLastCall(); |
| |
| expectStopStorage(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, |
| noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| assertStatistics(worker, 0, 0); |
| worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); |
| assertStatistics(worker, 0, 1); |
| worker.stop(); |
| assertStatistics(worker, 0, 0); |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testConverterOverrides() throws Exception { |
| expectConverters(); |
| expectStartStorage(); |
| |
| EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); |
| |
| Capture<TestConverter> keyConverter = EasyMock.newCapture(); |
| Capture<TestConfigurableConverter> valueConverter = EasyMock.newCapture(); |
| Capture<HeaderConverter> headerConverter = EasyMock.newCapture(); |
| |
| EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2); |
| PowerMock.expectNew( |
| WorkerSourceTask.class, EasyMock.eq(TASK_ID), |
| EasyMock.eq(task), |
| anyObject(TaskStatus.Listener.class), |
| EasyMock.eq(TargetState.STARTED), |
| EasyMock.capture(keyConverter), |
| EasyMock.capture(valueConverter), |
| EasyMock.capture(headerConverter), |
| EasyMock.eq(new TransformationChain<>(Collections.emptyList(), NOOP_OPERATOR)), |
| anyObject(KafkaProducer.class), |
| anyObject(OffsetStorageReader.class), |
| anyObject(OffsetStorageWriter.class), |
| anyObject(WorkerConfig.class), |
| anyObject(ClusterConfigState.class), |
| anyObject(ConnectMetrics.class), |
| EasyMock.eq(pluginLoader), |
| anyObject(Time.class), |
| anyObject(RetryWithToleranceOperator.class)) |
| .andReturn(workerTask); |
| Map<String, String> origProps = new HashMap<>(); |
| origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); |
| |
| TaskConfig taskConfig = new TaskConfig(origProps); |
| // We should expect this call, but the pluginLoader being swapped in is only mocked. |
| // EasyMock.expect(pluginLoader.loadClass(TestSourceTask.class.getName())) |
| // .andReturn((Class) TestSourceTask.class); |
| EasyMock.expect(plugins.newTask(TestSourceTask.class)).andReturn(task); |
| EasyMock.expect(task.version()).andReturn("1.0"); |
| |
| workerTask.initialize(taskConfig); |
| EasyMock.expectLastCall(); |
| |
| // Expect that the worker will create converters and will not initially find them using the current classloader ... |
| assertNotNull(taskKeyConverter); |
| assertNotNull(taskValueConverter); |
| assertNotNull(taskHeaderConverter); |
| expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null); |
| expectTaskKeyConverters(ClassLoaderUsage.PLUGINS, taskKeyConverter); |
| expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null); |
| expectTaskValueConverters(ClassLoaderUsage.PLUGINS, taskValueConverter); |
| expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); |
| expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); |
| |
| EasyMock.expect(executorService.submit(workerTask)).andReturn(null); |
| |
| EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); |
| EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) |
| .andReturn(pluginLoader); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader) |
| .times(2); |
| |
| EasyMock.expect(workerTask.loader()).andReturn(pluginLoader); |
| |
| EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader) |
| .times(2); |
| plugins.connectorClass(WorkerTestConnector.class.getName()); |
| EasyMock.expectLastCall().andReturn(WorkerTestConnector.class); |
| |
| // Remove |
| workerTask.stop(); |
| EasyMock.expectLastCall(); |
| EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); |
| EasyMock.expectLastCall(); |
| |
| expectStopStorage(); |
| |
| PowerMock.replayAll(); |
| |
| worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, |
| noneConnectorClientConfigOverridePolicy); |
| worker.start(); |
| assertStatistics(worker, 0, 0); |
| assertEquals(Collections.emptySet(), worker.taskIds()); |
| Map<String, String> connProps = anyConnectorConfigMap(); |
| connProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); |
| connProps.put("key.converter.extra.config", "foo"); |
| connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConfigurableConverter.class.getName()); |
| connProps.put("value.converter.extra.config", "bar"); |
| worker.startTask(TASK_ID, ClusterConfigState.EMPTY, connProps, origProps, taskStatusListener, TargetState.STARTED); |
| assertStatistics(worker, 0, 1); |
| assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds()); |
| worker.stopAndAwaitTask(TASK_ID); |
| assertStatistics(worker, 0, 0); |
| assertEquals(Collections.emptySet(), worker.taskIds()); |
| // Nothing should be left, so this should effectively be a nop |
| worker.stop(); |
| assertStatistics(worker, 0, 0); |
| |
| // We've mocked the Plugin.newConverter method, so we don't currently configure the converters |
| |
| PowerMock.verifyAll(); |
| } |
| |
| @Test |
| public void testProducerConfigsWithoutOverrides() { |
| EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn( |
| new HashMap<String, Object>()); |
| PowerMock.replayAll(); |
| Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs); |
| expectedConfigs.put("client.id", "connector-producer-job-0"); |
| assertEquals(expectedConfigs, |
| Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy)); |
| } |
| |
| @Test |
| public void testProducerConfigsWithOverrides() { |
| Map<String, String> props = new HashMap<>(workerProps); |
| props.put("producer.acks", "-1"); |
| props.put("producer.linger.ms", "1000"); |
| props.put("producer.client.id", "producer-test-id"); |
| WorkerConfig configWithOverrides = new StandaloneConfig(props); |
| |
| Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs); |
| expectedConfigs.put("acks", "-1"); |
| expectedConfigs.put("linger.ms", "1000"); |
| expectedConfigs.put("client.id", "producer-test-id"); |
| EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn( |
| new HashMap<String, Object>()); |
| PowerMock.replayAll(); |
| assertEquals(expectedConfigs, |
| Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy)); |
| } |
| |
| @Test |
| public void testProducerConfigsWithClientOverrides() { |
| Map<String, String> props = new HashMap<>(workerProps); |
| props.put("producer.acks", "-1"); |
| props.put("producer.linger.ms", "1000"); |
| props.put("producer.client.id", "producer-test-id"); |
| WorkerConfig configWithOverrides = new StandaloneConfig(props); |
| |
| Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs); |
| expectedConfigs.put("acks", "-1"); |
| expectedConfigs.put("linger.ms", "5000"); |
| expectedConfigs.put("batch.size", "1000"); |
| expectedConfigs.put("client.id", "producer-test-id"); |
| Map<String, Object> connConfig = new HashMap<String, Object>(); |
| connConfig.put("linger.ms", "5000"); |
| connConfig.put("batch.size", "1000"); |
| EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)) |
| .andReturn(connConfig); |
| PowerMock.replayAll(); |
| assertEquals(expectedConfigs, |
| Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy)); |
| } |
| |
| @Test |
| public void testConsumerConfigsWithoutOverrides() { |
| Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs); |
| expectedConfigs.put("group.id", "connect-test"); |
| expectedConfigs.put("client.id", "connector-consumer-test-1"); |
| EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>()); |
| PowerMock.replayAll(); |
| assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config, connectorConfig, |
| null, noneConnectorClientConfigOverridePolicy)); |
| } |
| |
| @Test |
| public void testConsumerConfigsWithOverrides() { |
| Map<String, String> props = new HashMap<>(workerProps); |
| props.put("consumer.auto.offset.reset", "latest"); |
| props.put("consumer.max.poll.records", "1000"); |
| props.put("consumer.client.id", "consumer-test-id"); |
| WorkerConfig configWithOverrides = new StandaloneConfig(props); |
| |
| Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs); |
| expectedConfigs.put("group.id", "connect-test"); |
| expectedConfigs.put("auto.offset.reset", "latest"); |
| expectedConfigs.put("max.poll.records", "1000"); |
| expectedConfigs.put("client.id", "consumer-test-id"); |
| EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>()); |
| PowerMock.replayAll(); |
| assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig, |
| null, noneConnectorClientConfigOverridePolicy)); |
| |
| } |
| |
| @Test |
| public void testConsumerConfigsWithClientOverrides() { |
| Map<String, String> props = new HashMap<>(workerProps); |
| props.put("consumer.auto.offset.reset", "latest"); |
| props.put("consumer.max.poll.records", "5000"); |
| WorkerConfig configWithOverrides = new StandaloneConfig(props); |
| |
| Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs); |
| expectedConfigs.put("group.id", "connect-test"); |
| expectedConfigs.put("auto.offset.reset", "latest"); |
| expectedConfigs.put("max.poll.records", "5000"); |
| expectedConfigs.put("max.poll.interval.ms", "1000"); |
| expectedConfigs.put("client.id", "connector-consumer-test-1"); |
| Map<String, Object> connConfig = new HashMap<String, Object>(); |
| connConfig.put("max.poll.records", "5000"); |
| connConfig.put("max.poll.interval.ms", "1000"); |
| EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)) |
| .andReturn(connConfig); |
| PowerMock.replayAll(); |
| assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig, |
| null, allConnectorClientConfigOverridePolicy)); |
| } |
| |
| @Test(expected = ConnectException.class) |
| public void testConsumerConfigsClientOverridesWithNonePolicy() { |
| Map<String, String> props = new HashMap<>(workerProps); |
| props.put("consumer.auto.offset.reset", "latest"); |
| props.put("consumer.max.poll.records", "5000"); |
| WorkerConfig configWithOverrides = new StandaloneConfig(props); |
| |
| Map<String, Object> connConfig = new HashMap<String, Object>(); |
| connConfig.put("max.poll.records", "5000"); |
| connConfig.put("max.poll.interval.ms", "1000"); |
| EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)) |
| .andReturn(connConfig); |
| PowerMock.replayAll(); |
| Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig, |
| null, noneConnectorClientConfigOverridePolicy); |
| } |
| |
| @Test |
| public void testAdminConfigsClientOverridesWithAllPolicy() { |
| Map<String, String> props = new HashMap<>(workerProps); |
| props.put("admin.client.id", "testid"); |
| props.put("admin.metadata.max.age.ms", "5000"); |
| props.put("producer.bootstrap.servers", "cbeauho.com"); |
| props.put("consumer.bootstrap.servers", "localhost:4761"); |
| WorkerConfig configWithOverrides = new StandaloneConfig(props); |
| |
| Map<String, Object> connConfig = new HashMap<String, Object>(); |
| connConfig.put("metadata.max.age.ms", "10000"); |
| |
| Map<String, String> expectedConfigs = new HashMap<>(workerProps); |
| |
| expectedConfigs.put("bootstrap.servers", "localhost:9092"); |
| expectedConfigs.put("client.id", "testid"); |
| expectedConfigs.put("metadata.max.age.ms", "10000"); |
| |
| EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX)) |
| .andReturn(connConfig); |
| PowerMock.replayAll(); |
| assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig, |
| null, allConnectorClientConfigOverridePolicy)); |
| } |
| |
| @Test(expected = ConnectException.class) |
| public void testAdminConfigsClientOverridesWithNonePolicy() { |
| Map<String, String> props = new HashMap<>(workerProps); |
| props.put("admin.client.id", "testid"); |
| props.put("admin.metadata.max.age.ms", "5000"); |
| WorkerConfig configWithOverrides = new StandaloneConfig(props); |
| |
| Map<String, Object> connConfig = new HashMap<String, Object>(); |
| connConfig.put("metadata.max.age.ms", "10000"); |
| |
| EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX)) |
| .andReturn(connConfig); |
| PowerMock.replayAll(); |
| Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig, |
| null, noneConnectorClientConfigOverridePolicy); |
| } |
| |
| private void assertStatusMetrics(long expected, String metricName) { |
| MetricGroup statusMetrics = worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector()); |
| if (expected == 0L) { |
| assertNull(statusMetrics); |
| return; |
| } |
| assertEquals(expected, (long) MockConnectMetrics.currentMetricValue(worker.metrics(), statusMetrics, metricName)); |
| } |
| |
| private void assertStatistics(Worker worker, int connectors, int tasks) { |
| assertStatusMetrics((long) tasks, "connector-total-task-count"); |
| MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup(); |
| assertEquals(connectors, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-count"), 0.0001d); |
| assertEquals(tasks, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-count"), 0.0001d); |
| assertEquals(tasks, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-count"), 0.0001d); |
| } |
| |
| private void assertStartupStatistics(Worker worker, int connectorStartupAttempts, int connectorStartupFailures, int taskStartupAttempts, int taskStartupFailures) { |
| double connectStartupSuccesses = connectorStartupAttempts - connectorStartupFailures; |
| double taskStartupSuccesses = taskStartupAttempts - taskStartupFailures; |
| double connectStartupSuccessPct = 0.0d; |
| double connectStartupFailurePct = 0.0d; |
| double taskStartupSuccessPct = 0.0d; |
| double taskStartupFailurePct = 0.0d; |
| if (connectorStartupAttempts != 0) { |
| connectStartupSuccessPct = connectStartupSuccesses / connectorStartupAttempts; |
| connectStartupFailurePct = (double) connectorStartupFailures / connectorStartupAttempts; |
| } |
| if (taskStartupAttempts != 0) { |
| taskStartupSuccessPct = taskStartupSuccesses / taskStartupAttempts; |
| taskStartupFailurePct = (double) taskStartupFailures / taskStartupAttempts; |
| } |
| MetricGroup workerMetrics = worker.workerMetricsGroup().metricGroup(); |
| assertEquals(connectorStartupAttempts, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-attempts-total"), 0.0001d); |
| assertEquals(connectStartupSuccesses, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-success-total"), 0.0001d); |
| assertEquals(connectorStartupFailures, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-failure-total"), 0.0001d); |
| assertEquals(connectStartupSuccessPct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-success-percentage"), 0.0001d); |
| assertEquals(connectStartupFailurePct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "connector-startup-failure-percentage"), 0.0001d); |
| assertEquals(taskStartupAttempts, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-attempts-total"), 0.0001d); |
| assertEquals(taskStartupSuccesses, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-success-total"), 0.0001d); |
| assertEquals(taskStartupFailures, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-failure-total"), 0.0001d); |
| assertEquals(taskStartupSuccessPct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-success-percentage"), 0.0001d); |
| assertEquals(taskStartupFailurePct, MockConnectMetrics.currentMetricValueAsDouble(worker.metrics(), workerMetrics, "task-startup-failure-percentage"), 0.0001d); |
| } |
| |
| private void expectStartStorage() { |
| offsetBackingStore.configure(anyObject(WorkerConfig.class)); |
| EasyMock.expectLastCall(); |
| offsetBackingStore.start(); |
| EasyMock.expectLastCall(); |
| } |
| |
| private void expectStopStorage() { |
| offsetBackingStore.stop(); |
| EasyMock.expectLastCall(); |
| } |
| |
| private void expectConverters() { |
| expectConverters(JsonConverter.class, false); |
| } |
| |
| private void expectConverters(Boolean expectDefaultConverters) { |
| expectConverters(JsonConverter.class, expectDefaultConverters); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private void expectConverters(Class<? extends Converter> converterClass, Boolean expectDefaultConverters) { |
| // As default converters are instantiated when a task starts, they are expected only if the `startTask` method is called |
| if (expectDefaultConverters) { |
| |
| // Instantiate and configure default |
| EasyMock.expect(plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS)) |
| .andReturn(keyConverter); |
| EasyMock.expect(plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS)) |
| .andReturn(valueConverter); |
| EasyMock.expectLastCall(); |
| } |
| |
| //internal |
| Converter internalKeyConverter = PowerMock.createMock(converterClass); |
| Converter internalValueConverter = PowerMock.createMock(converterClass); |
| |
| // Instantiate and configure internal |
| EasyMock.expect( |
| plugins.newConverter( |
| config, |
| WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, |
| ClassLoaderUsage.PLUGINS |
| ) |
| ).andReturn(internalKeyConverter); |
| EasyMock.expect( |
| plugins.newConverter( |
| config, |
| WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, |
| ClassLoaderUsage.PLUGINS |
| ) |
| ).andReturn(internalValueConverter); |
| EasyMock.expectLastCall(); |
| } |
| |
| private void expectTaskKeyConverters(ClassLoaderUsage classLoaderUsage, Converter returning) { |
| EasyMock.expect( |
| plugins.newConverter( |
| anyObject(AbstractConfig.class), |
| eq(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG), |
| eq(classLoaderUsage))) |
| .andReturn(returning); |
| } |
| |
| private void expectTaskValueConverters(ClassLoaderUsage classLoaderUsage, Converter returning) { |
| EasyMock.expect( |
| plugins.newConverter( |
| anyObject(AbstractConfig.class), |
| eq(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG), |
| eq(classLoaderUsage))) |
| .andReturn(returning); |
| } |
| |
| private void expectTaskHeaderConverter(ClassLoaderUsage classLoaderUsage, HeaderConverter returning) { |
| EasyMock.expect( |
| plugins.newHeaderConverter( |
| anyObject(AbstractConfig.class), |
| eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), |
| eq(classLoaderUsage))) |
| .andReturn(returning); |
| } |
| |
| private Map<String, String> anyConnectorConfigMap() { |
| Map<String, String> props = new HashMap<>(); |
| props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); |
| props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); |
| props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); |
| return props; |
| } |
| |
| /* Name here needs to be unique as we are testing the aliasing mechanism */ |
| public static class WorkerTestConnector extends Connector { |
| |
| private static final ConfigDef CONFIG_DEF = new ConfigDef() |
| .define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test configName."); |
| |
| @Override |
| public String version() { |
| return "1.0"; |
| } |
| |
| @Override |
| public void start(Map<String, String> props) { |
| |
| } |
| |
| @Override |
| public Class<? extends Task> taskClass() { |
| return null; |
| } |
| |
| @Override |
| public List<Map<String, String>> taskConfigs(int maxTasks) { |
| return null; |
| } |
| |
| @Override |
| public void stop() { |
| |
| } |
| |
| @Override |
| public ConfigDef config() { |
| return CONFIG_DEF; |
| } |
| } |
| |
| private static class TestSourceTask extends SourceTask { |
| public TestSourceTask() { |
| } |
| |
| @Override |
| public String version() { |
| return "1.0"; |
| } |
| |
| @Override |
| public void start(Map<String, String> props) { |
| } |
| |
| @Override |
| public List<SourceRecord> poll() throws InterruptedException { |
| return null; |
| } |
| |
| @Override |
| public void stop() { |
| } |
| } |
| |
| public static class TestConverter implements Converter { |
| public Map<String, ?> configs; |
| |
| @Override |
| public void configure(Map<String, ?> configs, boolean isKey) { |
| this.configs = configs; |
| } |
| |
| @Override |
| public byte[] fromConnectData(String topic, Schema schema, Object value) { |
| return new byte[0]; |
| } |
| |
| @Override |
| public SchemaAndValue toConnectData(String topic, byte[] value) { |
| return null; |
| } |
| } |
| |
| public static class TestConfigurableConverter implements Converter, Configurable { |
| public Map<String, ?> configs; |
| |
| public ConfigDef config() { |
| return JsonConverterConfig.configDef(); |
| } |
| |
| @Override |
| public void configure(Map<String, ?> configs) { |
| this.configs = configs; |
| new JsonConverterConfig(configs); // requires the `converter.type` config be set |
| } |
| |
| @Override |
| public void configure(Map<String, ?> configs, boolean isKey) { |
| this.configs = configs; |
| } |
| |
| @Override |
| public byte[] fromConnectData(String topic, Schema schema, Object value) { |
| return new byte[0]; |
| } |
| |
| @Override |
| public SchemaAndValue toConnectData(String topic, byte[] value) { |
| return null; |
| } |
| } |
| } |