Added AWS S3 test using AWS v2
diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index 6488cd7..dd3b24d 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -54,6 +54,11 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-aws2-sqs</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-s3</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java
index 4371955..7e44b9a 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java
@@ -27,6 +27,13 @@
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
 import software.amazon.awssdk.services.sqs.SqsClient;
 import software.amazon.awssdk.services.sqs.SqsClientBuilder;
 
@@ -105,6 +112,64 @@
         return new AWSSQSClient(clientBuilder.build());
     }
 
+    public static S3Client newS3Client() {
+        LOG.debug("Creating a new S3 client");
+        S3ClientBuilder clientBuilder = S3Client.builder();
 
+        String awsInstanceType = System.getProperty("aws-service.instance.type");
+
+        clientBuilder.region(Region.US_EAST_1);
+
+        URI endpoint = getEndpoint();
+
+        if (isLocalContainer(awsInstanceType) || endpoint != null) {
+            clientBuilder.endpointOverride(endpoint);
+        }
+
+        if (isLocalContainer(awsInstanceType)) {
+            clientBuilder.credentialsProvider(TestAWSCredentialsProvider.CONTAINER_LOCAL_DEFAULT_PROVIDER);
+
+        } else {
+            clientBuilder.credentialsProvider(TestAWSCredentialsProvider.SYSTEM_PROPERTY_PROVIDER);
+        }
+
+        return clientBuilder.build();
+    }
+
+
+    /**
+     * Delete an S3 bucket using the provided client. Coming from AWS documentation:
+     * https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html
+     *
+     * AWS SDK v1 doc for reference:
+     * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
+     * @param s3Client the AmazonS3 client instance used to delete the bucket
+     * @param bucketName a String containing the bucket name
+     */
+    public static void deleteBucket(S3Client s3Client, String bucketName) {
+        // Delete all objects from the bucket. This is sufficient
+        // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
+        // delete markers for all objects, but doesn't delete the object versions.
+        // To delete objects from versioned buckets, delete all of the object versions before deleting
+        // the bucket (see below for an example).
+        ListObjectsV2Request listObjectsRequest = ListObjectsV2Request.builder()
+                .bucket(bucketName)
+                .build();
+
+        ListObjectsV2Response objectListing;
+        do {
+            objectListing = s3Client.listObjectsV2(listObjectsRequest);
+
+            for (S3Object s3Object : objectListing.contents()) {
+                s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(s3Object.key()).build());
+            }
+
+            listObjectsRequest = ListObjectsV2Request.builder().bucket(bucketName)
+                    .continuationToken(objectListing.nextContinuationToken())
+                    .build();
+        } while (objectListing.isTruncated());
+
+        s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build());
+    }
 
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelAWSS3PropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelAWSS3PropertyFactory.java
new file mode 100644
index 0000000..bb30071
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelAWSS3PropertyFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.s3.source;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.common.AWSConfigs;
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+import software.amazon.awssdk.regions.Region;
+
+
+/**
+ * Creates the set of properties used by a Camel JMS Sink Connector
+ */
+final class CamelAWSS3PropertyFactory extends SourceConnectorPropertyFactory<CamelAWSS3PropertyFactory> {
+    public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+    public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+    static {
+        SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-s3.accessKey");
+        SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-s3.secretKey");
+        SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-s3.region");
+
+        KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-s3.access-key");
+        KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-s3.secret-key");
+        KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-s3.region");
+    }
+
+    private CamelAWSS3PropertyFactory() {
+
+    }
+
+    public CamelAWSS3PropertyFactory withAmazonConfig(Properties amazonConfigs) {
+        return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+    }
+
+    public CamelAWSS3PropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
+        String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY);
+        String secretKeyKey = style.get(AWSConfigs.SECRET_KEY);
+        String regionKey = style.get(AWSConfigs.REGION);
+
+        setProperty(accessKeyKey,
+                amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
+        setProperty(secretKeyKey,
+                amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
+        return setProperty(regionKey,
+                amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString()));
+    }
+
+    public EndpointUrlBuilder<CamelAWSS3PropertyFactory> withUrl(String bucket) {
+        String queueUrl = String.format("aws2-s3://%s", bucket);
+
+        return new EndpointUrlBuilder<>(this::withSourceUrl, queueUrl);
+    }
+
+    public CamelAWSS3PropertyFactory withMaxMessagesPerPoll(int value) {
+        return setProperty("camel.source.endpoint.maxMessagesPerPoll", Integer.toString(value));
+    }
+
+    public CamelAWSS3PropertyFactory withBucketNameOrArn(String bucketNameOrArn) {
+        return setProperty("camel.source.path.bucketNameOrArn", bucketNameOrArn);
+    }
+
+    public CamelAWSS3PropertyFactory withConfiguration(String configurationClass) {
+        return setProperty("camel.component.aws2-s3.configuration", classRef(configurationClass));
+    }
+
+    public static CamelAWSS3PropertyFactory basic() {
+        return new CamelAWSS3PropertyFactory()
+                .withName("CamelAwss3SourceConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
new file mode 100644
index 0000000..aa0a828
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.s3.source;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.aws.common.AWSCommon;
+import org.apache.camel.kafkaconnector.aws.common.AWSConfigs;
+import org.apache.camel.kafkaconnector.aws.common.services.AWSService;
+import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSDKClientUtils;
+import org.apache.camel.kafkaconnector.aws.v2.services.AWSServiceFactory;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Testcontainers
+public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
+
+    @RegisterExtension
+    public static AWSService<S3Client> service = AWSServiceFactory.createS3Service();
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
+
+    private S3Client awsS3Client;
+    private String bucketName;
+
+    private volatile int received;
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-s3-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        awsS3Client = service.getClient();
+        received = 0;
+        bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
+
+        try {
+            CreateBucketRequest request = CreateBucketRequest.builder()
+                    .bucket(bucketName)
+                    .build();
+
+            awsS3Client.createBucket(request);
+        } catch (Exception e) {
+            LOG.error("Unable to create bucket: {}", e.getMessage(), e);
+            fail("Unable to create bucket");
+        }
+    }
+
+    @AfterEach
+    public void tearDown() {
+        try {
+            AWSSDKClientUtils.deleteBucket(awsS3Client, bucketName);
+        } catch (Exception e) {
+            LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
+        }
+    }
+
+    private boolean checkRecord(ConsumerRecord<String, String> record) {
+        LOG.debug("Received: {}", record.value());
+        received++;
+
+        if (received == expect) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        LOG.debug("Putting S3 objects");
+        for (int i = 0; i < expect; i++) {
+            String name = "file" + i + ".test";
+            String file = this.getClass().getResource(name).getFile();
+
+            LOG.trace("Putting file {}", file);
+            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+                    .bucket(bucketName)
+                    .key(name)
+                    .build();
+
+            awsS3Client.putObject(putObjectRequest, new File(file).toPath());
+        }
+        LOG.debug("Done putting S3S objects");
+
+        LOG.debug("Creating the consumer ...");
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
+        LOG.debug("Created the consumer ...");
+
+        assertEquals(received, expect, "Didn't process the expected amount of messages");
+    }
+
+    @Test
+    @Timeout(180)
+    public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withConfiguration(TestS3Configuration.class.getName())
+                .withBucketNameOrArn(bucketName)
+                .withAmazonConfig(service.getConnectionProperties());
+
+        runTest(connectorPropertyFactory);
+    }
+
+    @Test
+    @Timeout(180)
+    public void testBasicSendReceiveWithMaxMessagesPerPoll() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withConfiguration(TestS3Configuration.class.getName())
+                .withMaxMessagesPerPoll(5)
+                .withBucketNameOrArn(bucketName)
+                .withAmazonConfig(service.getConnectionProperties());
+
+        runTest(connectorPropertyFactory);
+    }
+
+    @Test
+    @Timeout(180)
+    public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withConfiguration(TestS3Configuration.class.getName())
+                .withBucketNameOrArn(bucketName)
+                .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
+
+        runTest(connectorPropertyFactory);
+    }
+
+    @Test
+    @Timeout(180)
+    public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
+        Properties amazonProperties = service.getConnectionProperties();
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withConfiguration(TestS3Configuration.class.getName())
+                .withUrl(bucketName)
+                    .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
+                    .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
+                    .appendIfAvailable("proxyProtocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
+                    .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()))
+                .buildUrl();
+
+        runTest(connectorPropertyFactory);
+    }
+
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/TestS3Configuration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/TestS3Configuration.java
new file mode 100644
index 0000000..9898f74
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/TestS3Configuration.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.s3.source;
+
+import org.apache.camel.component.aws2.s3.AWS2S3Configuration;
+import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.s3.S3Client;
+
+public class TestS3Configuration extends AWS2S3Configuration {
+    private S3Client s3Client;
+
+    private S3Client buildClient() {
+        return AWSSDKClientUtils.newS3Client();
+    }
+
+    @Override
+    public S3Client getAmazonS3Client() {
+        if (s3Client == null) {
+            s3Client = buildClient();
+        }
+
+        return s3Client;
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/AWSS3LocalContainerService.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/AWSS3LocalContainerService.java
new file mode 100644
index 0000000..c9bef95
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/AWSS3LocalContainerService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.services;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.TestAWSCredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+public class AWSS3LocalContainerService extends AWSLocalContainerService<S3Client> {
+    private static final Logger LOG = LoggerFactory.getLogger(AWSS3LocalContainerService.class);
+
+    public AWSS3LocalContainerService() {
+        super(Service.S3);
+
+        LOG.info("Initializing the local AWS services");
+        getContainer().start();
+    }
+
+
+    @Override
+    public S3Client getClient() {
+        Region region = Region.US_EAST_1;
+
+        return S3Client.builder()
+                .region(region)
+                .credentialsProvider(TestAWSCredentialsProvider.CONTAINER_LOCAL_DEFAULT_PROVIDER)
+                .endpointOverride(getServiceEndpoint())
+                .build();
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/AWSServiceFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/AWSServiceFactory.java
index e02ef2e..67c56e4 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/AWSServiceFactory.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/AWSServiceFactory.java
@@ -24,6 +24,7 @@
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.SdkSystemSetting;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.s3.S3Client;
 
 public final class AWSServiceFactory {
     private static final Logger LOG = LoggerFactory.getLogger(AWSServiceFactory.class);
@@ -74,4 +75,21 @@
                 awsInstanceType);
         throw new UnsupportedOperationException("Invalid AWS instance type");
     }
+
+    public static AWSService<S3Client> createS3Service() {
+        String awsInstanceType = System.getProperty("aws-service.instance.type");
+        LOG.info("Creating a {} AWS S3 instance", awsInstanceType);
+
+        if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) {
+            return new AWSS3LocalContainerService();
+        }
+
+        if (awsInstanceType.equals("remote")) {
+            return new AWSRemoteService<>(AWSSDKClientUtils::newS3Client);
+        }
+
+        LOG.error("Invalid AWS instance type: {}. Must be either 'remote' or 'local-aws-container'",
+                awsInstanceType);
+        throw new UnsupportedOperationException("Invalid AWS instance type");
+    }
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/Service.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/Service.java
index c236cd7..8c6a3b3 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/Service.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/services/Service.java
@@ -19,7 +19,8 @@
 
 public enum Service {
     KINESIS("kinesis"),
-    SQS("sqs");
+    SQS("sqs"),
+    S3("s3");
 
     private final String serviceName;
 
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file0.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file0.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file0.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file1.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file1.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file1.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file2.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file2.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file2.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file3.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file3.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file3.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file4.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file4.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file4.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file5.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file5.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file5.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file6.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file6.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file6.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file7.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file7.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file7.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file8.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file8.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file8.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file9.test b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file9.test
new file mode 100644
index 0000000..fc590f9
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/resources/org/apache/camel/kafkaconnector/aws/v2/s3/source/file9.test
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file