Adjusted test code to avoid recycling test topics
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
index c0fce3a..e862464 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -27,7 +27,6 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSLocalContainerService;
@@ -160,7 +159,7 @@
@Timeout(value = 120)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
.basic()
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
index a8f7a1b..db02c0e 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
@@ -29,7 +29,6 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
@@ -138,7 +137,7 @@
@Timeout(90)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSEC2PropertyFactory
.basic()
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
index d524168..2764969 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
@@ -29,7 +29,6 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
@@ -131,7 +130,7 @@
@Timeout(90)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSIAMPropertyFactory
.basic()
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
index 61f3daa..1f3ecda 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
@@ -30,7 +30,6 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -141,7 +140,7 @@
@Timeout(120)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
.basic()
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
index c88e8ef..3aef0c3 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
@@ -28,7 +28,6 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
@@ -144,7 +143,7 @@
@Timeout(120)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKMSPropertyFactory
.basic()
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
index 1af40aa..309fbca 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
@@ -36,7 +36,6 @@
import org.apache.camel.kafkaconnector.common.clients.kafka.ProducerPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.AbstractTestMessageProducer;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
@@ -174,7 +173,7 @@
@Timeout(90)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSLambdaPropertyFactory
.basic()
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
index e489c63..565aceb 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
@@ -30,7 +30,6 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -151,7 +150,7 @@
@Timeout(180)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = service.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSS3PropertyFactory
.basic()
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
index e44d61b..f8ae34e 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
@@ -27,7 +27,6 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -94,7 +93,7 @@
@BeforeEach
public void setUp() {
- topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ topicName = getTopicForTest(this);
awsS3Client = AWSSDKClientUtils.newS3Client();
bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index fddb565..f688611 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -25,7 +25,6 @@
import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -129,7 +128,7 @@
public void testBasicSendReceive() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
.basic()
@@ -153,7 +152,7 @@
public void testBasicSendReceiveUsingKafkaStyle() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
.basic()
@@ -178,7 +177,7 @@
public void testBasicSendReceiveUsingUrl() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
.basic()
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
index 5f6525f..8b0b155 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
@@ -20,7 +20,6 @@
import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
import org.apache.camel.test.infra.common.TestUtils;
import org.apache.camel.test.infra.kafka.services.ContainerLocalKafkaService;
@@ -68,7 +67,16 @@
return kafkaConnectService;
}
+ /**
+ * Gets a topic name for the test class
+ * @param clazz
+ * @return
+ */
+ protected String getDefaultTestTopic(Class<?> clazz) {
+ return clazz.getName();
+ }
+
protected String getTopicForTest(Object testObject) {
- return CamelKafkaConnectorTestUtils.getDefaultTestTopic(testObject.getClass()) + "." + TestUtils.randomWithRange(0, 1000);
+ return getDefaultTestTopic(testObject.getClass()) + "." + TestUtils.randomWithRange(0, 1000);
}
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/CamelKafkaConnectorTestUtils.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/CamelKafkaConnectorTestUtils.java
deleted file mode 100644
index b3a3269..0000000
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/CamelKafkaConnectorTestUtils.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.kafkaconnector.common.utils;
-
-/**
- * Test utilities
- */
-public final class CamelKafkaConnectorTestUtils {
- private CamelKafkaConnectorTestUtils() {
- }
-
-
- /**
- * Gets a topic name for the test class
- * @param clazz
- * @return
- */
- public static String getDefaultTestTopic(Class<?> clazz) {
- return clazz.getName();
- }
-}
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java
index ae6f11a..3100c5d 100644
--- a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java
@@ -31,11 +31,11 @@
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
import org.apache.camel.test.infra.common.TestUtils;
import org.apache.camel.test.infra.jdbc.services.JDBCService;
import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -57,6 +57,7 @@
private final int expect = 10;
private int received;
+ private String topicName;
static {
final String postgresImage = "postgres:9.6.2";
@@ -74,6 +75,11 @@
.build();
}
+ @BeforeEach
+ void setUp() {
+ topicName = getTopicForTest(this);
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-jdbc-kafka-connector"};
@@ -92,7 +98,7 @@
jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i);
try {
- kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()), body, jdbcParameters);
+ kafkaClient.produce(topicName, body, jdbcParameters);
} catch (ExecutionException e) {
LOG.error("Unable to produce messages: {}", e.getMessage(), e);
} catch (InterruptedException e) {
@@ -165,7 +171,7 @@
.end()
.withDataSourceName("anotherName")
.withUseHeaderAsParameters(true)
- .withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()));
+ .withTopics(topicName);
runTest(factory);
diff --git a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
index 9919573..652877e 100644
--- a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
+++ b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
@@ -23,7 +23,6 @@
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.salesforce.clients.SalesforceCliContainer;
import org.apache.camel.kafkaconnector.salesforce.clients.SfdxCommand;
import org.apache.camel.test.infra.common.TestUtils;
@@ -96,6 +95,7 @@
private String accountName;
private boolean recordCreated;
+ private String topicName;
@Override
protected String[] getConnectorsInTest() {
@@ -105,6 +105,7 @@
@BeforeEach
public void setUp() {
accountName = "TestSinkAccount" + TestUtils.randomWithRange(1, 100);
+ topicName = getTopicForTest(this);
}
@AfterEach
@@ -166,14 +167,14 @@
LOG.info("Sending new account {}", data);
- kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()), data);
+ kafkaClient.produce(topicName, data);
}
@Test
@Timeout(180)
public void testBasicProduce() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory factory = CamelSalesforcePropertyFactory.basic()
- .withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
diff --git a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
index bc2bd0f..00738be 100644
--- a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
+++ b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
@@ -25,7 +25,6 @@
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.salesforce.clients.SalesforceCliContainer;
import org.apache.camel.kafkaconnector.salesforce.clients.SfdxCommand;
import org.apache.camel.test.infra.common.TestUtils;
@@ -98,6 +97,7 @@
private volatile boolean received;
private String account;
+ private String topicName;
@Override
protected String[] getConnectorsInTest() {
@@ -109,6 +109,7 @@
received = false;
account = "TestAccount" + TestUtils.randomWithRange(1, 100);
+ topicName = getTopicForTest(this);
SfdxCommand sfdxCommand = SfdxCommand.forceDataRecordCreate()
.withArgument("-u", userName)
@@ -150,7 +151,7 @@
LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
+ kafkaClient.consume(topicName, this::checkRecord);
LOG.debug("Created the consumer ...");
assertTrue(received, "Didn't receive any messages");
@@ -205,7 +206,7 @@
@Timeout(180)
public void testBasicConsume() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory factory = CamelSalesforcePropertyFactory.basic()
- .withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
@@ -228,7 +229,7 @@
@Timeout(180)
public void testBasicConsumeUsingUrl() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory factory = CamelSalesforcePropertyFactory.basic()
- .withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
@@ -271,7 +272,7 @@
* HTTP error 500 without much details.
*/
ConnectorPropertyFactory factory = CamelSalesforcePropertyFactory.basic()
- .withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
@@ -289,7 +290,7 @@
@Timeout(180)
public void testBasicCDCUsingUrl() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory factory = CamelSalesforcePropertyFactory.basic()
- .withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index c363693..a1ce08b 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -32,7 +32,6 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.camel.test.infra.common.TestUtils;
@@ -86,7 +85,7 @@
LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
received = 0;
- topic = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100);
+ topic = getTopicForTest(this);
destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
index 25fe759..967f8d9 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
@@ -24,9 +24,9 @@
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
@@ -45,6 +45,7 @@
private boolean running;
private String trace;
+ private String topicName;
private Properties connectionProperties() {
@@ -56,6 +57,11 @@
return properties;
}
+ @BeforeEach
+ void setUp() {
+ topicName = getTopicForTest(this);
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-sjms2-kafka-connector"};
@@ -82,7 +88,7 @@
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()), "Sink test message ");
+ kafkaClient.produce(topicName, "Sink test message ");
}
private void checkThatFailed() throws InterruptedException {
@@ -110,7 +116,7 @@
ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
- .withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withConnectionProperties(brokenProp)
.withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
.withDeadLetterQueueTopicName("dlq-sink-topic");
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
index 583957b..5878e1d 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
@@ -23,7 +23,6 @@
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
@@ -47,6 +46,7 @@
private final int expect = 10;
private int errors;
private final int expectedErrors = 1;
+ private String topicName;
private Properties connectionProperties() {
Properties properties = new Properties();
@@ -65,6 +65,7 @@
@BeforeEach
public void setUp() {
errors = 0;
+ topicName = getTopicForTest(this);
}
private <T> boolean checkDqlRecord(ConsumerRecord<String, T> record) {
@@ -88,7 +89,7 @@
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
for (int i = 0; i < expect; i++) {
- kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
+ kafkaClient.produce(topicName, "Sink test message " + i);
}
LOG.debug("Created the consumer ... About to receive messages");
@@ -105,7 +106,7 @@
ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
.basic()
- .withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withConnectionProperties(brokenProp)
.withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
.withDeadLetterQueueTopicName("dlq-sink-topic");
diff --git a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
index 013fec4..b9ec30d 100644
--- a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
+++ b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
@@ -22,7 +22,7 @@
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
@@ -46,6 +46,12 @@
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSlackITCase.class);
private String slackChannel = System.getProperty("it.test.slack.channel");
private String webhookUrl = System.getProperty("it.test.slack.webhookUrl");
+ private String topicName;
+
+ @BeforeEach
+ void setUp() {
+ topicName = getTopicForTest(this);
+ }
@Override
protected String[] getConnectorsInTest() {
@@ -58,7 +64,7 @@
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()), message);
+ kafkaClient.produce(topicName, message);
LOG.debug("Created the consumer ... About to receive messages");
@@ -70,7 +76,7 @@
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelSlackPropertyFactory
.basic()
- .withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withChannel(slackChannel)
.withWebhookUrl(webhookUrl);
@@ -88,7 +94,7 @@
try {
ConnectorPropertyFactory connectorPropertyFactory = CamelSlackPropertyFactory
.basic()
- .withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withUrl(slackChannel)
.append("webhookUrl", webhookUrl)
.buildUrl();
diff --git a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/source/CamelSourceSlackITCase.java b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/source/CamelSourceSlackITCase.java
index 579bc45..0611216 100644
--- a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/source/CamelSourceSlackITCase.java
+++ b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/source/CamelSourceSlackITCase.java
@@ -22,7 +22,6 @@
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -77,7 +76,7 @@
@Test
@Timeout(90)
public void testBasicSendReceive() throws ExecutionException, InterruptedException {
- String kafkaTopic = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String kafkaTopic = getTopicForTest(this);
ConnectorPropertyFactory factory = CamelSlackPropertyFactory
.basic()
.withKafkaTopic(kafkaTopic)
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index 50c5483..03ee0f3 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -23,7 +23,6 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.ssh.services.SshService;
import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
import org.junit.jupiter.api.BeforeEach;
@@ -67,7 +66,7 @@
@BeforeEach
public void setUp() {
- topic = CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ topic = getTopicForTest(this);
}
diff --git a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
index d5dfb15..4c232d7 100644
--- a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
+++ b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
@@ -22,7 +22,6 @@
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService;
import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory;
import org.junit.jupiter.api.BeforeAll;
@@ -51,7 +50,7 @@
public void testMemory() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
.basic()
- .withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(getTopicForTest(this))
.withUrl(service.connectionProperties().hostname(), service.connectionProperties().port(),
"X.test")
.append("username", service.connectionProperties().username())