CASSANDRASC-85: Expose TTL option for the create snapshot endpoint

Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-85
diff --git a/CHANGES.txt b/CHANGES.txt
index f8a4f87..d61a23f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Expose TTL option for the create snapshot endpoint (CASSANDRASC-85)
  * Allow DriverUtils to be pluggable (CASSANDRASC-88)
  * Add JMX health checks during the periodic health checks (CASSANDRASC-87)
  * Sidecar should be able to load metadata even if the local instance is unavailable (CASSANDRASC-79)
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
index 5caab2e..bd39c5b 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -48,9 +48,9 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraAdapter.class);
     protected final DnsResolver dnsResolver;
     protected final JmxClient jmxClient;
-    private final CQLSessionProvider cqlSessionProvider;
-    private final InetSocketAddress localNativeTransportAddress;
-    private final DriverUtils driverUtils;
+    protected final CQLSessionProvider cqlSessionProvider;
+    protected final InetSocketAddress localNativeTransportAddress;
+    protected final DriverUtils driverUtils;
     private volatile Host host;
 
     public CassandraAdapter(DnsResolver dnsResolver,
@@ -127,7 +127,7 @@
         return activeSession.execute(statement);
     }
 
-    private Host getHost(Metadata metadata)
+    protected Host getHost(Metadata metadata)
     {
         if (host == null)
         {
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
index ce14d1e..e6f12f1 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
@@ -80,12 +80,40 @@
     }
 
     /**
-     * {@inheritDoc}
+     * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified.
+     * It logs a warning when the {@code ttl} option is provided as the option is unsupported.
+     *
+     * @param tag      the tag given to the snapshot; may not be null or empty
+     * @param keyspace the keyspace in the Cassandra database to use for the snapshot
+     * @param table    the table in the Cassandra database to use for the snapshot
+     * @param options  map of options, for example ttl, skipFlush
      */
     @Override
     public void takeSnapshot(@NotNull String tag, @NotNull String keyspace, @NotNull String table,
                              @Nullable Map<String, String> options)
     {
+        if (options != null && options.containsKey("ttl"))
+        {
+            LOGGER.warn("The ttl option is not supported in Cassandra 4.0");
+        }
+
+        takeSnapshotInternal(tag, keyspace, table, options);
+    }
+
+    /**
+     * Actually performs the take snapshot operation of a multiple column family from different keyspaces.
+     * A snapshot name must be specified.
+     *
+     * @param tag      the tag given to the snapshot; may not be null or empty
+     * @param keyspace the keyspace in the Cassandra database to use for the snapshot
+     * @param table    the table in the Cassandra database to use for the snapshot
+     * @param options  map of options, for example ttl, skipFlush
+     */
+    protected void takeSnapshotInternal(@NotNull String tag,
+                                        @NotNull String keyspace,
+                                        @NotNull String table,
+                                        @Nullable Map<String, String> options)
+    {
         requireNonNull(tag, "snapshot tag must be non-null");
         requireNonNull(keyspace, "keyspace for the  must be non-null");
         requireNonNull(table, "table must be non-null");
diff --git a/adapters/cassandra41/build.gradle b/adapters/cassandra41/build.gradle
new file mode 100644
index 0000000..53885a9
--- /dev/null
+++ b/adapters/cassandra41/build.gradle
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.nio.file.Paths
+
+plugins {
+    id 'java-library'
+    id 'idea'
+    id 'maven-publish'
+    id "com.github.spotbugs"
+}
+
+group 'org.apache.cassandra.sidecar'
+
+version project.version
+
+sourceCompatibility = 1.8
+
+repositories {
+    mavenCentral()
+}
+
+test {
+    useJUnitPlatform()
+    maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
+    reports {
+        junitXml.enabled = true
+        def destDir = Paths.get(rootProject.rootDir.absolutePath, "build", "test-results", "adapters-cassandra41").toFile()
+        println("Destination directory for adapters-cassandra41 tests: ${destDir}")
+        junitXml.destination = destDir
+        html.enabled = true
+    }
+}
+
+dependencies {
+    api(project(":common"))
+    api(project(":adapters:base"))
+    compileOnly('org.jetbrains:annotations:23.0.0')
+    compileOnly('com.datastax.cassandra:cassandra-driver-core:3.11.3')
+    implementation("org.slf4j:slf4j-api:${project.slf4jVersion}")
+}
+
+publishing {
+    publications {
+        maven(MavenPublication) {
+            from components.java
+            groupId project.group
+            artifactId "adapters-cassandra41"
+            version System.getenv("CODE_VERSION") ?: "${version}"
+        }
+    }
+}
diff --git a/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Adapter.java b/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Adapter.java
new file mode 100644
index 0000000..b643b35
--- /dev/null
+++ b/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Adapter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.cassandra41;
+
+import java.net.InetSocketAddress;
+
+import org.apache.cassandra.sidecar.adapters.base.CassandraAdapter;
+import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.StorageOperations;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
+
+/**
+ * A {@link ICassandraAdapter} implementation for Cassandra 4.1 and later
+ */
+public class Cassandra41Adapter extends CassandraAdapter
+{
+    public Cassandra41Adapter(DnsResolver dnsResolver,
+                              JmxClient jmxClient,
+                              CQLSessionProvider session,
+                              InetSocketAddress localNativeTransportAddress,
+                              DriverUtils driverUtils)
+    {
+        super(dnsResolver, jmxClient, session, localNativeTransportAddress, driverUtils);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public StorageOperations storageOperations()
+    {
+        return new Cassandra41StorageOperations(jmxClient, dnsResolver);
+    }
+}
diff --git a/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Factory.java b/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Factory.java
new file mode 100644
index 0000000..0bba5d7
--- /dev/null
+++ b/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41Factory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.cassandra41;
+
+import java.net.InetSocketAddress;
+
+import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.ICassandraFactory;
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.MinimumVersion;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.utils.DriverUtils;
+
+/**
+ * Factory to produce the 4.1 adapter
+ */
+@MinimumVersion("4.1.0")
+public class Cassandra41Factory implements ICassandraFactory
+{
+    protected final DnsResolver dnsResolver;
+    private final DriverUtils driverUtils;
+
+    public Cassandra41Factory(DnsResolver dnsResolver, DriverUtils driverUtils)
+    {
+        this.dnsResolver = dnsResolver;
+        this.driverUtils = driverUtils;
+    }
+
+    /**
+     * Returns a new adapter for Cassandra 4.0 clusters.
+     *
+     * @param session                     the session to the Cassandra database
+     * @param jmxClient                   the JMX client to connect to the Cassandra database
+     * @param localNativeTransportAddress the native transport address and port of the instance
+     * @return a new adapter for the 4.0 clusters
+     */
+    @Override
+    public ICassandraAdapter create(CQLSessionProvider session, JmxClient jmxClient,
+                                    InetSocketAddress localNativeTransportAddress)
+    {
+        return new Cassandra41Adapter(dnsResolver, jmxClient, session, localNativeTransportAddress, driverUtils);
+    }
+}
diff --git a/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41StorageOperations.java b/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41StorageOperations.java
new file mode 100644
index 0000000..928355a
--- /dev/null
+++ b/adapters/cassandra41/src/main/java/org/apache/cassandra/sidecar/adapters/cassandra41/Cassandra41StorageOperations.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.cassandra41;
+
+import java.util.Map;
+
+import org.apache.cassandra.sidecar.adapters.base.CassandraStorageOperations;
+import org.apache.cassandra.sidecar.adapters.base.RingProvider;
+import org.apache.cassandra.sidecar.adapters.base.TokenRangeReplicaProvider;
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.StorageOperations;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An implementation of the {@link StorageOperations} that interfaces with Cassandra 4.1 and later
+ */
+public class Cassandra41StorageOperations extends CassandraStorageOperations
+{
+    /**
+     * Creates a new instance with the provided {@link JmxClient} and {@link DnsResolver}
+     *
+     * @param jmxClient   the JMX client used to communicate with the Cassandra instance
+     * @param dnsResolver the DNS resolver used to lookup replicas
+     */
+    public Cassandra41StorageOperations(JmxClient jmxClient, DnsResolver dnsResolver)
+    {
+        super(jmxClient, dnsResolver);
+    }
+
+    /**
+     * Creates a new instances with the provided {@link JmxClient}, {@link RingProvider}, and
+     * {@link TokenRangeReplicaProvider}. This constructor is exposed for extensibility.
+     *
+     * @param jmxClient                 the JMX client used to communicate with the Cassandra instance
+     * @param ringProvider              the ring provider instance
+     * @param tokenRangeReplicaProvider the token range replica provider
+     */
+    public Cassandra41StorageOperations(JmxClient jmxClient,
+                                        RingProvider ringProvider,
+                                        TokenRangeReplicaProvider tokenRangeReplicaProvider)
+    {
+        super(jmxClient, ringProvider, tokenRangeReplicaProvider);
+    }
+
+    @Override
+    public void takeSnapshot(@NotNull String tag,
+                             @NotNull String keyspace,
+                             @NotNull String table,
+                             @Nullable Map<String, String> options)
+    {
+        super.takeSnapshotInternal(tag, keyspace, table, options);
+    }
+}
diff --git a/build.gradle b/build.gradle
index 2e0743d..1eadde1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -212,6 +212,7 @@
 
     implementation(project(":common"))
     implementation(project(":adapters:base"))
+    implementation(project(":adapters:cassandra41"))
 
     testFixturesApi(testFixtures(project(":common")))
     testFixturesImplementation("io.vertx:vertx-junit5:${project.vertxVersion}")
@@ -227,6 +228,9 @@
     integrationTestImplementation(group: 'org.apache.cassandra', name: 'dtest-api', version: "${dtestApiVersion}")
     // Needed by the Cassandra dtest framework
     integrationTestImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
+    // Needed for snapshot manifest validation
+    integrationTestImplementation(group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: "${project.jacksonVersion}")
+
 }
 
 jar {
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index b7c7e82..9433e3a 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -45,6 +45,7 @@
 import org.apache.cassandra.sidecar.client.selection.InstanceSelectionPolicy;
 import org.apache.cassandra.sidecar.client.selection.SingleInstanceSelectionPolicy;
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * The context for a given request that include the {@link InstanceSelectionPolicy}, the {@link RetryPolicy}, and
@@ -368,11 +369,16 @@
          * @param keyspace     the keyspace in Cassandra
          * @param tableName    the table name in Cassandra
          * @param snapshotName the name of the snapshot
+         * @param snapshotTTL  an optional time to live option for the snapshot (available since Cassandra 4.1+)
+         *                     The TTL option must specify the units, for example 2d represents a TTL for 2 days;
+         *                     1h represents a TTL of 1 hour, etc. Valid units are {@code d}, {@code h}, {@code s},
+         *                     {@code ms}, {@code us}, {@code µs}, {@code ns}, and {@code m}.
          * @return a reference to this Builder
          */
-        public Builder createSnapshotRequest(String keyspace, String tableName, String snapshotName)
+        public Builder createSnapshotRequest(String keyspace, String tableName, String snapshotName,
+                                             @Nullable String snapshotTTL)
         {
-            return request(new CreateSnapshotRequest(keyspace, tableName, snapshotName));
+            return request(new CreateSnapshotRequest(keyspace, tableName, snapshotName, snapshotTTL));
         }
 
         /**
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 90fd47f..7e5dbbe 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -43,6 +43,7 @@
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * The SidecarClient class to perform requests
@@ -313,9 +314,32 @@
                                                   String table,
                                                   String snapshotName)
     {
+        return createSnapshot(instance, keyspace, table, snapshotName, null);
+    }
+
+    /**
+     * Executes the create snapshot request using the default retry policy and provided {@code instance}
+     *
+     * @param instance     the instance where the request will be executed
+     * @param keyspace     the keyspace in Cassandra
+     * @param table        the table name in Cassandra
+     * @param snapshotName the name of the snapshot
+     * @param snapshotTTL  an optional time to live option for the snapshot (available since Cassandra 4.1+).
+     *                     The TTL option must specify the units, for example 2d represents a TTL for 2 days;
+     *                     1h represents a TTL of 1 hour, etc. Valid units are {@code d}, {@code h}, {@code s},
+     *                     {@code ms}, {@code us}, {@code µs}, {@code ns}, and {@code m}.
+     * @return a completable future for the request
+     */
+    public CompletableFuture<Void> createSnapshot(SidecarInstance instance,
+                                                  String keyspace,
+                                                  String table,
+                                                  String snapshotName,
+                                                  @Nullable String snapshotTTL)
+    {
         return executor.executeRequestAsync(requestBuilder().retryPolicy(ignoreConflictRetryPolicy)
                                                             .singleInstanceSelectionPolicy(instance)
-                                                            .createSnapshotRequest(keyspace, table, snapshotName)
+                                                            .createSnapshotRequest(keyspace, table,
+                                                                                   snapshotName, snapshotTTL)
                                                             .build());
     }
 
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CreateSnapshotRequest.java b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CreateSnapshotRequest.java
index 4da2766..69f7d87 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/request/CreateSnapshotRequest.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/request/CreateSnapshotRequest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.client.request;
 
 import io.netty.handler.codec.http.HttpMethod;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Represents a request to create a snapshot
@@ -32,10 +33,14 @@
      * @param keyspace     the keyspace in Cassandra
      * @param table        the table name in Cassandra
      * @param snapshotName the name of the snapshot
+     * @param snapshotTTL  an optional time to live option for the snapshot (available since Cassandra 4.1+)
+     *                     The TTL option must specify the units, for example 2d represents a TTL for 2 days;
+     *                     1h represents a TTL of 1 hour, etc. Valid units are {@code d}, {@code h}, {@code s},
+     *                     {@code ms}, {@code us}, {@code µs}, {@code ns}, and {@code m}.
      */
-    public CreateSnapshotRequest(String keyspace, String table, String snapshotName)
+    public CreateSnapshotRequest(String keyspace, String table, String snapshotName, @Nullable String snapshotTTL)
     {
-        super(keyspace, table, snapshotName);
+        super(keyspace, table, snapshotName, false, snapshotTTL);
     }
 
     /**
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/request/ListSnapshotFilesRequest.java b/client/src/main/java/org/apache/cassandra/sidecar/client/request/ListSnapshotFilesRequest.java
index 2a05e17..701d12f 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/request/ListSnapshotFilesRequest.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/request/ListSnapshotFilesRequest.java
@@ -40,7 +40,7 @@
                                     String snapshotName,
                                     boolean includeSecondaryIndexFiles)
     {
-        super(keyspace, table, snapshotName, includeSecondaryIndexFiles);
+        super(keyspace, table, snapshotName, includeSecondaryIndexFiles, null);
     }
 
     /**
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/request/SnapshotRequest.java b/client/src/main/java/org/apache/cassandra/sidecar/client/request/SnapshotRequest.java
index 6e2d174..c3cdc67 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/request/SnapshotRequest.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/request/SnapshotRequest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.client.request;
 
 import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * An abstract class that access the {@link ApiEndpointsV1#SNAPSHOTS_ROUTE}
@@ -29,26 +30,40 @@
 {
     SnapshotRequest(String keyspace, String table, String snapshotName)
     {
-        super(requestURI(keyspace, table, snapshotName, false));
+        super(requestURI(keyspace, table, snapshotName, false, null));
     }
 
-    SnapshotRequest(String keyspace, String table, String snapshotName, boolean includeSecondaryIndexFiles)
+    SnapshotRequest(String keyspace, String table, String snapshotName, boolean includeSecondaryIndexFiles,
+                    @Nullable String snapshotTTL)
     {
-        super(requestURI(keyspace, table, snapshotName, includeSecondaryIndexFiles));
+        super(requestURI(keyspace, table, snapshotName, includeSecondaryIndexFiles, snapshotTTL));
     }
 
-    static String requestURI(String keyspace, String tableName, String snapshotName, boolean includeSecondaryIndexFiles)
+    static String requestURI(String keyspace, String tableName, String snapshotName,
+                             boolean includeSecondaryIndexFiles, @Nullable String snapshotTTL)
     {
         String requestUri = ApiEndpointsV1.SNAPSHOTS_ROUTE
                             .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM, keyspace)
                             .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, tableName)
                             .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, snapshotName);
 
-        if (!includeSecondaryIndexFiles)
+        if (!includeSecondaryIndexFiles && snapshotTTL == null)
         {
             return requestUri;
         }
 
-        return requestUri + "?includeSecondaryIndexFiles=true";
+        requestUri = requestUri + '?';
+
+        if (includeSecondaryIndexFiles)
+        {
+            requestUri = requestUri + "includeSecondaryIndexFiles=true";
+        }
+
+        if (snapshotTTL != null)
+        {
+            requestUri = requestUri + "ttl=" + snapshotTTL;
+        }
+
+        return requestUri;
     }
 }
diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index d47e460..17e0d9f 100644
--- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -558,6 +558,27 @@
     }
 
     @Test
+    void testCreateSnapshotWithTTL() throws Exception
+    {
+        MockResponse response = new MockResponse().setResponseCode(OK.code());
+        SidecarInstanceImpl sidecarInstance = instances.get(3);
+        MockWebServer mockWebServer = servers.get(3);
+        mockWebServer.enqueue(response);
+
+        client.createSnapshot(sidecarInstance, "cycling", "cyclist_name", "2023.04.11", "2d")
+              .get(30, TimeUnit.SECONDS);
+
+        assertThat(mockWebServer.getRequestCount()).isEqualTo(1);
+        RecordedRequest request = mockWebServer.takeRequest();
+        String expected = ApiEndpointsV1.SNAPSHOTS_ROUTE
+                          .replaceAll(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "cycling")
+                          .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name")
+                          .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.11") + "?ttl=2d";
+        assertThat(request.getPath()).isEqualTo(expected);
+        assertThat(request.getMethod()).isEqualTo("PUT");
+    }
+
+    @Test
     void testCleanUploadSession() throws Exception
     {
         MockResponse response = new MockResponse().setResponseCode(OK.code());
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java
index e8581d6..928a23e 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java
@@ -31,7 +31,7 @@
      * @param session                     the session to the Cassandra database
      * @param client                      the JMX client to connect to the Cassandra database
      * @param localNativeTransportAddress the native transport address and port of the instance
-     * @return an {@link ICassandraAdapter} implementation for the instance provdied
+     * @return an {@link ICassandraAdapter} implementation for the instance provided
      */
     ICassandraAdapter create(CQLSessionProvider session,
                              JmxClient client,
diff --git a/settings.gradle b/settings.gradle
index 1eb9963..60899a3 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -20,6 +20,7 @@
 rootProject.name = "cassandra-sidecar"
 
 include "adapters:base"
+include "adapters:cassandra41"
 include "common"
 include "client"
 include "vertx-client"
diff --git a/src/main/java/org/apache/cassandra/sidecar/data/SSTableImportRequest.java b/src/main/java/org/apache/cassandra/sidecar/data/SSTableImportRequest.java
index d917f42..5c5d9d1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/data/SSTableImportRequest.java
+++ b/src/main/java/org/apache/cassandra/sidecar/data/SSTableImportRequest.java
@@ -25,7 +25,7 @@
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.common.data.SSTableUploads;
 
-import static org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanHeader;
+import static org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
 
 /**
  * Holder class for the {@code org.apache.cassandra.sidecar.routes.SSTableUploadsResource}
@@ -186,12 +186,12 @@
         HttpServerRequest request = context.request();
         return new SSTableImportRequest(qualifiedTableName,
                                         context.pathParam("uploadId"),
-                                        parseBooleanHeader(request, "resetLevel", true),
-                                        parseBooleanHeader(request, "clearRepaired", true),
-                                        parseBooleanHeader(request, "verifySSTables", true),
-                                        parseBooleanHeader(request, "verifyTokens", true),
-                                        parseBooleanHeader(request, "invalidateCaches", true),
-                                        parseBooleanHeader(request, "extendedVerify", true),
-                                        parseBooleanHeader(request, "copyData", false));
+                                        parseBooleanQueryParam(request, "resetLevel", true),
+                                        parseBooleanQueryParam(request, "clearRepaired", true),
+                                        parseBooleanQueryParam(request, "verifySSTables", true),
+                                        parseBooleanQueryParam(request, "verifyTokens", true),
+                                        parseBooleanQueryParam(request, "invalidateCaches", true),
+                                        parseBooleanQueryParam(request, "extendedVerify", true),
+                                        parseBooleanQueryParam(request, "copyData", false));
     }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/data/SnapshotRequest.java b/src/main/java/org/apache/cassandra/sidecar/data/SnapshotRequest.java
index b8d5aed..5827f18 100644
--- a/src/main/java/org/apache/cassandra/sidecar/data/SnapshotRequest.java
+++ b/src/main/java/org/apache/cassandra/sidecar/data/SnapshotRequest.java
@@ -30,6 +30,7 @@
     private final String snapshotName;
     private final boolean includeSecondaryIndexFiles;
     private final QualifiedTableName qualifiedTableName;
+    private final String ttl;
 
     /**
      * Constructor for the holder class
@@ -38,22 +39,26 @@
      * @param tableName                  the table name in Cassandra
      * @param snapshotName               the name of the snapshot
      * @param includeSecondaryIndexFiles true if secondary index files are allowed, false otherwise
+     * @param ttl                        an optional TTL for snapshot creation
      */
     public SnapshotRequest(String keyspace,
                            String tableName,
                            String snapshotName,
-                           boolean includeSecondaryIndexFiles)
+                           boolean includeSecondaryIndexFiles,
+                           String ttl)
     {
-        this(new QualifiedTableName(keyspace, tableName, true), snapshotName, includeSecondaryIndexFiles);
+        this(new QualifiedTableName(keyspace, tableName, true), snapshotName, includeSecondaryIndexFiles, ttl);
     }
 
     public SnapshotRequest(QualifiedTableName qualifiedTableName,
                            String snapshotName,
-                           boolean includeSecondaryIndexFiles)
+                           boolean includeSecondaryIndexFiles,
+                           String ttl)
     {
         this.qualifiedTableName = qualifiedTableName;
         this.snapshotName = Objects.requireNonNull(snapshotName, "snapshotName must not be null");
         this.includeSecondaryIndexFiles = includeSecondaryIndexFiles;
+        this.ttl = ttl;
     }
 
     /**
@@ -97,6 +102,14 @@
     }
 
     /**
+     * @return the TTL for snapshot creation if provided
+     */
+    public String ttl()
+    {
+        return ttl;
+    }
+
+    /**
      * {@inheritDoc}
      */
     public String toString()
@@ -106,6 +119,7 @@
                ", tableName='" + tableName() + '\'' +
                ", snapshotName='" + snapshotName + '\'' +
                ", includeSecondaryIndexFiles=" + includeSecondaryIndexFiles +
+               ", ttl=" + ttl +
                '}';
     }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
index c1568d3..04b3071 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/AbstractHandler.java
@@ -83,9 +83,17 @@
         HttpServerRequest request = context.request();
         String host = host(context);
         SocketAddress remoteAddress = request.remoteAddress();
-        T requestParams = extractParamsOrThrow(context);
-        logger.debug("{} received request={}, remoteAddress={}, instance={}",
-                     this.getClass().getSimpleName(), requestParams, remoteAddress, host);
+        T requestParams = null;
+        try
+        {
+            requestParams = extractParamsOrThrow(context);
+            logger.debug("{} received request={}, remoteAddress={}, instance={}",
+                         this.getClass().getSimpleName(), requestParams, remoteAddress, host);
+        }
+        catch (Exception exception)
+        {
+            processFailure(exception, context, host, remoteAddress, null);
+        }
 
         handleInternal(context, request, host, remoteAddress, requestParams);
     }
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java
index 7adb465..ceb76d1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java
@@ -21,6 +21,7 @@
 import java.io.FileNotFoundException;
 import java.nio.file.NoSuchFileException;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
@@ -82,7 +83,8 @@
 @Singleton
 public class SnapshotsHandler extends AbstractHandler<SnapshotRequest>
 {
-    private static final String INCLUDE_SECONDARY_INDEX_FILES = "includeSecondaryIndexFiles";
+    private static final String INCLUDE_SECONDARY_INDEX_FILES_QUERY_PARAM = "includeSecondaryIndexFiles";
+    private static final String TTL_QUERY_PARAM = "ttl";
     private final SnapshotPathBuilder builder;
     private final ServiceConfiguration configuration;
 
@@ -235,14 +237,20 @@
     {
         ExecutorPools.TaskExecutorPool pool = executorPools.service();
         pool.executeBlocking(promise -> {
-                CassandraAdapterDelegate delegate = metadataFetcher.delegate(host(context));
+                CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+                if (delegate == null)
+                    throw cassandraServiceUnavailable();
                 StorageOperations storageOperations = delegate.storageOperations();
                 if (storageOperations == null)
                     throw cassandraServiceUnavailable();
                 logger.debug("Creating snapshot request={}, remoteAddress={}, instance={}",
                              requestParams, remoteAddress, host);
+                Map<String, String> options = requestParams.ttl() != null
+                                              ? ImmutableMap.of("ttl", requestParams.ttl())
+                                              : ImmutableMap.of();
+
                 storageOperations.takeSnapshot(requestParams.snapshotName(), requestParams.keyspace(),
-                                               requestParams.tableName(), ImmutableMap.of());
+                                               requestParams.tableName(), options);
                 JsonObject jsonObject = new JsonObject()
                                         .put("result", "Success");
                 context.json(jsonObject);
@@ -338,13 +346,14 @@
     @Override
     protected SnapshotRequest extractParamsOrThrow(final RoutingContext context)
     {
-        boolean includeSecondaryIndexFiles = RequestUtils.parseBooleanHeader(context.request(),
-                                                                             INCLUDE_SECONDARY_INDEX_FILES,
-                                                                             false);
+        boolean includeSecondaryIndexFiles =
+        RequestUtils.parseBooleanQueryParam(context.request(), INCLUDE_SECONDARY_INDEX_FILES_QUERY_PARAM, false);
+        String ttl = context.request().getParam(TTL_QUERY_PARAM);
 
         SnapshotRequest snapshotRequest = new SnapshotRequest(qualifiedTableName(context),
                                                               context.pathParam("snapshot"),
-                                                              includeSecondaryIndexFiles
+                                                              includeSecondaryIndexFiles,
+                                                              ttl
         );
         validate(snapshotRequest);
         return snapshotRequest;
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index 7a72b22..72fe0ba 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -43,6 +43,7 @@
 import io.vertx.ext.web.handler.StaticHandler;
 import io.vertx.ext.web.handler.TimeoutHandler;
 import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
+import org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41Factory;
 import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl;
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
@@ -319,9 +320,10 @@
     @Singleton
     public CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver, DriverUtils driverUtils)
     {
-        CassandraVersionProvider.Builder builder = new CassandraVersionProvider.Builder();
-        builder.add(new CassandraFactory(dnsResolver, driverUtils));
-        return builder.build();
+        return new CassandraVersionProvider.Builder()
+               .add(new CassandraFactory(dnsResolver, driverUtils))
+               .add(new Cassandra41Factory(dnsResolver, driverUtils))
+               .build();
     }
 
     @Provides
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/CassandraVersionProvider.java b/src/main/java/org/apache/cassandra/sidecar/utils/CassandraVersionProvider.java
index eb86040..8e96a68 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/CassandraVersionProvider.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/CassandraVersionProvider.java
@@ -56,17 +56,17 @@
     {
         ICassandraFactory result = versions.get(0);
 
-        for (ICassandraFactory f : versions)
+        for (ICassandraFactory factory : versions)
         {
             SimpleCassandraVersion currentMinVersion = SimpleCassandraVersion.create(result);
-            SimpleCassandraVersion nextVersion = SimpleCassandraVersion.create(f);
+            SimpleCassandraVersion nextVersion = SimpleCassandraVersion.create(factory);
 
             // skip if we can rule this out early
             if (nextVersion.isGreaterThan(requestedVersion)) continue;
 
             if (requestedVersion.isGreaterThan(currentMinVersion))
             {
-                result = f;
+                result = factory;
             }
         }
         return result;
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/RequestUtils.java b/src/main/java/org/apache/cassandra/sidecar/utils/RequestUtils.java
index 8a5f62c..d9e7bc8 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/RequestUtils.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/RequestUtils.java
@@ -26,17 +26,17 @@
 public class RequestUtils
 {
     /**
-     * Parses a boolean parameter from the {@code request}, for the given {@code headerName}. If the request param
+     * Parses a boolean parameter from the {@code request}, for the given {@code paramName}. If the request param
      * is not {@code true} or {@code false}, it returns the {@code defaultValue}.
      *
      * @param request      the request
-     * @param headerName   the name of the header
+     * @param paramName    the name of the query parameter
      * @param defaultValue the default value when the request parameter does not match {@code true} or {@code false}
-     * @return the parsed value for the {@code headerName} from the {@code request}
+     * @return the parsed value for the {@code paramName} from the {@code request}
      */
-    public static boolean parseBooleanHeader(HttpServerRequest request, String headerName, boolean defaultValue)
+    public static boolean parseBooleanQueryParam(HttpServerRequest request, String paramName, boolean defaultValue)
     {
-        String value = request.getParam(headerName);
+        String value = request.getParam(paramName);
         if ("true".equalsIgnoreCase(value))
             return true;
         if ("false".equalsIgnoreCase(value))
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java b/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
index 54d54a5..9c774f5 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SimpleCassandraVersion.java
@@ -105,6 +105,7 @@
     }
 
 
+    @Override
     public int compareTo(SimpleCassandraVersion other)
     {
         if (major < other.major)
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java
index 3d7b371..a5a6d4a 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java
@@ -18,14 +18,26 @@
 
 package org.apache.cassandra.sidecar.routes;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import com.datastax.driver.core.Session;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import io.vertx.ext.web.client.WebClient;
 import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import io.vertx.junit5.VertxExtension;
@@ -33,7 +45,10 @@
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.CassandraTestContext;
+import org.apache.cassandra.testing.SimpleCassandraVersion;
 
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -68,6 +83,69 @@
     }
 
     @CassandraIntegrationTest
+    void createSnapshotWithTtlFailsWhenUnitsAreNotSpecified(VertxTestContext context,
+                                                            CassandraTestContext cassandraTestContext)
+    throws InterruptedException
+    {
+        if (cassandraTestContext.version.compareTo(SimpleCassandraVersion.create(4, 1, 0)) < 0)
+        {
+            // TTL is only supported in Cassandra 4.1
+            context.completeNow();
+            return;
+        }
+
+        createTestKeyspace();
+        QualifiedTableName tableName = createTestTableAndPopulate();
+
+        WebClient client = WebClient.create(vertx);
+        String expectedErrorMessage = "Invalid duration: 500 Accepted units:[SECONDS, MINUTES, HOURS, DAYS] " +
+                                      "where case matters and only non-negative values.";
+        String testRoute = String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/my-snapshot?ttl=500",
+                                         tableName.maybeQuotedKeyspace(), tableName.maybeQuotedTableName());
+        client.put(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_BAD_REQUEST)
+              .send(context.succeeding(response -> context.verify(() -> {
+                  assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
+                  assertThat(response.bodyAsJsonObject().getString("message"))
+                  .isEqualTo(expectedErrorMessage);
+                  context.completeNow();
+              })));
+        // wait until test completes
+        assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
+    }
+
+    @CassandraIntegrationTest
+    void createSnapshotWithTtlFailsWhenTtlIsTooSmall(VertxTestContext context,
+                                                     CassandraTestContext cassandraTestContext)
+    throws InterruptedException
+    {
+        if (cassandraTestContext.version.compareTo(SimpleCassandraVersion.create(4, 1, 0)) < 0)
+        {
+            // TTL is only supported in Cassandra 4.1
+            context.completeNow();
+            return;
+        }
+
+        createTestKeyspace();
+        QualifiedTableName tableName = createTestTableAndPopulate();
+
+        WebClient client = WebClient.create(vertx);
+        String expectedErrorMessage = "ttl for snapshot must be at least 60 seconds";
+        String testRoute = String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/my-snapshot?ttl=1s",
+                                         tableName.maybeQuotedKeyspace(), tableName.maybeQuotedTableName());
+        client.put(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_BAD_REQUEST)
+              .send(context.succeeding(response -> context.verify(() -> {
+                  assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
+                  assertThat(response.bodyAsJsonObject().getString("message"))
+                  .isEqualTo(expectedErrorMessage);
+                  context.completeNow();
+              })));
+        // wait until test completes
+        assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
+    }
+
+    @CassandraIntegrationTest
     void createSnapshotFailsWhenSnapshotAlreadyExists(VertxTestContext context)
     throws InterruptedException
     {
@@ -123,6 +201,78 @@
     }
 
     @CassandraIntegrationTest
+    void testCreateSnapshotEndpointWithTtl(VertxTestContext context,
+                                           CassandraTestContext cassandraTestContext) throws InterruptedException
+    {
+        // TTL is only supported in Cassandra 4.1
+        boolean validateExpectedTtl = cassandraTestContext.version
+                                      .compareTo(SimpleCassandraVersion.create("4.1.0")) >= 0;
+
+        createTestKeyspace();
+        QualifiedTableName tableName = createTestTableAndPopulate();
+
+        long expectedTtlInSeconds = 61;
+        WebClient client = WebClient.create(vertx);
+        String testRoute = String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/ttl-snapshot?ttl=%ds",
+                                         tableName.maybeQuotedKeyspace(), tableName.maybeQuotedTableName(),
+                                         expectedTtlInSeconds);
+        client.put(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> context.verify(() -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+
+                  // validate that the snapshot is created
+                  List<Path> found = findChildFile(sidecarTestContext, "127.0.0.1",
+                                                   "ttl-snapshot");
+                  assertThat(found).isNotEmpty()
+                                   .anyMatch(p -> p.toString().endsWith("manifest.json"))
+                                   .anyMatch(p -> p.toString().endsWith("schema.cql"))
+                                   .anyMatch(p -> p.toString().endsWith("-big-Data.db"));
+
+                  // get manifest
+                  Optional<Path> manifest = found.stream()
+                                                 .filter(p -> p.toString().endsWith("manifest.json"))
+                                                 .findFirst();
+
+                  assertThat(manifest).isPresent();
+                  assertThat(manifest.get()).exists();
+                  validateManifestExpirationDate(manifest.get(), expectedTtlInSeconds, validateExpectedTtl);
+
+                  context.completeNow();
+              })));
+        // wait until test completes
+        assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
+    }
+
+    private void validateManifestExpirationDate(Path manifestPath, long expectedTtl, boolean validateExpectedTtl)
+    {
+        ObjectMapper jsonMapper = new ObjectMapper(new JsonFactory());
+        jsonMapper.registerModule(new JavaTimeModule());
+        jsonMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+        jsonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+        try (InputStream in = Files.newInputStream(manifestPath))
+        {
+            SnapshotManifest manifest = jsonMapper.readValue(in, SnapshotManifest.class);
+
+            if (validateExpectedTtl)
+            {
+                long actualTtl = ChronoUnit.SECONDS.between(manifest.createdAt, manifest.expiresAt);
+                assertThat(actualTtl).isEqualTo(expectedTtl);
+            }
+            else
+            {
+                assertThat(manifest.createdAt).isNull();
+                assertThat(manifest.expiresAt).isNull();
+            }
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @CassandraIntegrationTest
     void testCreateSnapshotEndpointWithMixedCaseTableName(VertxTestContext context)
     throws InterruptedException
     {
@@ -231,7 +381,7 @@
     private QualifiedTableName createTestTableAndPopulate(String tableNamePrefix)
     {
         QualifiedTableName tableName = createTestTable(tableNamePrefix,
-        "CREATE TABLE %s (id text PRIMARY KEY, name text);");
+                                                       "CREATE TABLE %s (id text PRIMARY KEY, name text);");
         Session session = maybeGetSession();
 
         session.execute("INSERT INTO " + tableName + " (id, name) VALUES ('1', 'Francisco');");
@@ -261,4 +411,16 @@
         // wait until test completes
         assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
     }
+
+    static class SnapshotManifest
+    {
+        @JsonProperty("files")
+        public List<String> files;
+
+        @JsonProperty("created_at")
+        public Instant createdAt;
+
+        @JsonProperty("expires_at")
+        public Instant expiresAt;
+    }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
index 8729330..0b97589 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
@@ -35,6 +35,7 @@
 import org.apache.cassandra.distributed.impl.InstanceConfig;
 import org.apache.cassandra.distributed.shared.JMXUtil;
 import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
+import org.apache.cassandra.sidecar.adapters.cassandra41.Cassandra41Factory;
 import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl;
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
@@ -104,8 +105,11 @@
 
     public static CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver)
     {
+        DriverUtils driverUtils = new DriverUtils();
         return new CassandraVersionProvider.Builder()
-               .add(new CassandraFactory(dnsResolver, new DriverUtils())).build();
+               .add(new CassandraFactory(dnsResolver, driverUtils))
+               .add(new Cassandra41Factory(dnsResolver, driverUtils))
+               .build();
     }
 
     private static int tryGetIntConfig(IInstanceConfig config, String configName, int defaultValue)
diff --git a/src/test/java/org/apache/cassandra/sidecar/data/SnapshotRequestTest.java b/src/test/java/org/apache/cassandra/sidecar/data/SnapshotRequestTest.java
index ace902e..19c39fe 100644
--- a/src/test/java/org/apache/cassandra/sidecar/data/SnapshotRequestTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/data/SnapshotRequestTest.java
@@ -23,12 +23,15 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+/**
+ * Unit tests for the {@link SnapshotRequest} object
+ */
 class SnapshotRequestTest
 {
     @Test
     void failsWhenKeyspaceIsNull()
     {
-        assertThatThrownBy(() -> new SnapshotRequest(null, "table", "snapshot", false))
+        assertThatThrownBy(() -> new SnapshotRequest(null, "table", "snapshot", false, null))
         .isInstanceOf(NullPointerException.class)
         .hasMessageContaining("keyspace must not be null");
     }
@@ -36,7 +39,7 @@
     @Test
     void failsWhenTableNameIsNull()
     {
-        assertThatThrownBy(() -> new SnapshotRequest("ks", null, "snapshot", true))
+        assertThatThrownBy(() -> new SnapshotRequest("ks", null, "snapshot", true, null))
         .isInstanceOf(NullPointerException.class)
         .hasMessageContaining("tableName must not be null");
     }
@@ -44,7 +47,7 @@
     @Test
     void failsWhenSnapshotNameIsNull()
     {
-        assertThatThrownBy(() -> new SnapshotRequest("ks", "table", null, false))
+        assertThatThrownBy(() -> new SnapshotRequest("ks", "table", null, false, null))
         .isInstanceOf(NullPointerException.class)
         .hasMessageContaining("snapshotName must not be null");
     }
@@ -52,7 +55,7 @@
     @Test
     void testValidRequest()
     {
-        SnapshotRequest request = new SnapshotRequest("ks", "table", "snapshot", false);
+        SnapshotRequest request = new SnapshotRequest("ks", "table", "snapshot", false, null);
 
         assertThat(request.qualifiedTableName()).isNotNull();
         assertThat(request.qualifiedTableName().keyspace()).isEqualTo("ks");
@@ -61,7 +64,27 @@
         assertThat(request.tableName()).isEqualTo("table");
         assertThat(request.snapshotName()).isEqualTo("snapshot");
         assertThat(request.includeSecondaryIndexFiles()).isFalse();
+        assertThat(request.ttl()).isNull();
         assertThat(request.toString()).isEqualTo("SnapshotRequest{keyspace='ks', tableName='table', " +
-                                                 "snapshotName='snapshot', includeSecondaryIndexFiles=false}");
+                                                 "snapshotName='snapshot', includeSecondaryIndexFiles=false, " +
+                                                 "ttl=null}");
+    }
+
+    @Test
+    void testValidRequestWithTTL()
+    {
+        SnapshotRequest request = new SnapshotRequest("ks", "table", "snapshot", false, "3d");
+
+        assertThat(request.qualifiedTableName()).isNotNull();
+        assertThat(request.qualifiedTableName().keyspace()).isEqualTo("ks");
+        assertThat(request.qualifiedTableName().tableName()).isEqualTo("table");
+        assertThat(request.keyspace()).isEqualTo("ks");
+        assertThat(request.tableName()).isEqualTo("table");
+        assertThat(request.snapshotName()).isEqualTo("snapshot");
+        assertThat(request.includeSecondaryIndexFiles()).isFalse();
+        assertThat(request.ttl()).isEqualTo("3d");
+        assertThat(request.toString()).isEqualTo("SnapshotRequest{keyspace='ks', tableName='table', " +
+                                                 "snapshotName='snapshot', includeSecondaryIndexFiles=false, " +
+                                                 "ttl=3d}");
     }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/RequestUtilsTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/RequestUtilsTest.java
index 549e9b5..8790deb 100644
--- a/src/test/java/org/apache/cassandra/sidecar/utils/RequestUtilsTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/RequestUtilsTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.sidecar.utils;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import io.vertx.core.http.HttpServerRequest;
@@ -32,35 +33,42 @@
  */
 class RequestUtilsTest
 {
-    @Test
-    void testParseBooleanHeader()
+    HttpServerRequest mockRequest;
+
+    @BeforeEach
+    void setup()
     {
-        HttpServerRequest mockRequest = mock(Http1xServerRequest.class);
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "non-existent", true)).isTrue();
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "non-existent-false", false)).isFalse();
+        mockRequest = mock(Http1xServerRequest.class);
+    }
+
+    @Test
+    void testParseBooleanQueryParam()
+    {
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "non-existent", true)).isTrue();
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "non-existent-false", false)).isFalse();
 
         when(mockRequest.getParam("false-param")).thenReturn("false");
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "false-param", true)).isFalse();
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "false-param", true)).isFalse();
 
         when(mockRequest.getParam("fAlSe-mixed-case-param")).thenReturn("fAlSe");
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "fAlSe-mixed-case-param", true)).isFalse();
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "fAlSe-mixed-case-param", true)).isFalse();
 
         when(mockRequest.getParam("FALSE-uppercase-param")).thenReturn("FALSE");
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "FALSE-uppercase-param", true)).isFalse();
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "FALSE-uppercase-param", true)).isFalse();
 
         when(mockRequest.getParam("true-param")).thenReturn("true");
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "true-param", false)).isTrue();
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "true-param", false)).isTrue();
 
         when(mockRequest.getParam("TrUe-mixed-case-param")).thenReturn("TrUe");
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "TrUe-mixed-case-param", false)).isTrue();
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "TrUe-mixed-case-param", false)).isTrue();
 
         when(mockRequest.getParam("TRUE-uppercase-param")).thenReturn("TRUE");
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "TRUE-uppercase-param", false)).isTrue();
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "TRUE-uppercase-param", false)).isTrue();
 
         when(mockRequest.getParam("default-value-true")).thenReturn("not-a-valid-true");
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "default-value-true", false)).isFalse();
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "default-value-true", false)).isFalse();
 
         when(mockRequest.getParam("default-value-false")).thenReturn("not-a-valid-false");
-        assertThat(RequestUtils.parseBooleanHeader(mockRequest, "default-value-false", true)).isTrue();
+        assertThat(RequestUtils.parseBooleanQueryParam(mockRequest, "default-value-false", true)).isTrue();
     }
 }
diff --git a/src/testFixtures/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java b/src/testFixtures/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
index a6727fc..0b79b8c 100644
--- a/src/testFixtures/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
+++ b/src/testFixtures/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
@@ -227,7 +227,8 @@
                                                       new SnapshotRequest("ks",
                                                                           "table",
                                                                           "snapshot",
-                                                                          false)),
+                                                                          false,
+                                                                          null)),
                                        "No data directories are available for host 'emptyDataDirInstance'");
     }
 
@@ -244,7 +245,8 @@
                                                       new SnapshotRequest("ks",
                                                                           "table",
                                                                           "snapshot",
-                                                                          false)),
+                                                                          false,
+                                                                          null)),
                                        "Keyspace 'ks' does not exist");
     }
 
@@ -261,7 +263,8 @@
                                                       new SnapshotRequest("non_existent",
                                                                           "table",
                                                                           "snapshot",
-                                                                          false)),
+                                                                          false,
+                                                                          null)),
                                        "Keyspace 'non_existent' does not exist");
     }
 
@@ -278,7 +281,8 @@
                                                       new SnapshotRequest("not_a_keyspace_dir",
                                                                           "table",
                                                                           "snapshot",
-                                                                          false)),
+                                                                          false,
+                                                                          null)),
                                        "Keyspace 'not_a_keyspace_dir' does not exist");
     }
 
@@ -295,7 +299,8 @@
                                                       new SnapshotRequest("ks1",
                                                                           "non_existent",
                                                                           "snapshot",
-                                                                          false)),
+                                                                          false,
+                                                                          null)),
                                        "Table 'non_existent' does not exist");
     }
 
@@ -313,7 +318,8 @@
                                                       new SnapshotRequest("ks1",
                                                                           "table",
                                                                           "snapshot",
-                                                                          false)),
+                                                                          false,
+                                                                          null)),
                                        "Table 'table' does not exist");
     }
 
@@ -330,7 +336,8 @@
                                                       new SnapshotRequest("ks1",
                                                                           "not_a_table_dir",
                                                                           "snapshot",
-                                                                          false)),
+                                                                          false,
+                                                                          null)),
                                        "Table 'not_a_table_dir' does not exist");
     }
 
@@ -347,7 +354,8 @@
                                                       new SnapshotRequest("ks1",
                                                                           "table1",
                                                                           "non_existent",
-                                                                          false)),
+                                                                          false,
+                                                                          null)),
                                        "Snapshot directory 'non_existent' does not exist");
     }
 
@@ -364,7 +372,8 @@
                                                       new SnapshotRequest("ks1",
                                                                           "table1",
                                                                           "not_a_snapshot_dir",
-                                                                          false)),
+                                                                          false,
+                                                                          null)),
                                        "Snapshot directory 'not_a_snapshot_dir' does not exist");
     }
 
@@ -508,7 +517,8 @@
                                               new SnapshotRequest("ks1",
                                                                   "table1",
                                                                   "backup.2022-03-17-04-PDT",
-                                                                  false)),
+                                                                  false,
+                                                                  null)),
                                expectedPath);
 
         expectedPath = dataDir0.getAbsolutePath() + "/data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd";
@@ -516,7 +526,8 @@
                                               new SnapshotRequest("ks2",
                                                                   "table2",
                                                                   "ea823202-a62c-4603-bb6a-4e15d79091cd",
-                                                                  false)),
+                                                                  false,
+                                                                  null)),
                                expectedPath);
 
         expectedPath = dataDir1.getAbsolutePath() + "/ks3/table3/snapshots/snapshot1";
@@ -524,7 +535,8 @@
                                               new SnapshotRequest("ks3",
                                                                   "table3",
                                                                   "snapshot1",
-                                                                  false)),
+                                                                  false,
+                                                                  null)),
                                expectedPath);
 
         // table table4 shares the prefix with table table4abc
@@ -535,7 +547,8 @@
                                               new SnapshotRequest("ks4",
                                                                   "table4abc",
                                                                   "this_is_a_valid_snapshot_name_i_❤_u",
-                                                                  false)),
+                                                                  false,
+                                                                  null)),
                                expectedPath);
     }
 
@@ -588,7 +601,8 @@
                                       new SnapshotRequest("ks1",
                                                           "a_table",
                                                           "a_snapshot",
-                                                          false)),
+                                                          false,
+                                                          null)),
                                expectedPath);
 
         expectedPath = atableWithUUID.getAbsolutePath() + "/snapshots/a_snapshot/index.db";
@@ -606,7 +620,8 @@
                                       new SnapshotRequest("ks1",
                                                           "a_table",
                                                           "a_snapshot",
-                                                          false)),
+                                                          false,
+                                                          null)),
                                expectedPath);
 
         expectedPath = atableWithUUID.getAbsolutePath() + "/snapshots/a_snapshot/nb-203-big-TOC.txt";
@@ -624,7 +639,8 @@
                                       new SnapshotRequest("ks1",
                                                           "a_table",
                                                           "a_snapshot",
-                                                          false)),
+                                                          false,
+                                                          null)),
                                expectedPath);
     }
 
@@ -683,7 +699,8 @@
                                       new SnapshotRequest("ks4",
                                                           "table4",
                                                           "this_is_a_valid_snapshot_name_i_❤_u",
-                                                          false)),
+                                                          false,
+                                                          null)),
                                expectedPath);
 
         expectedPath = table4New.getAbsolutePath()
@@ -703,7 +720,8 @@
                                       new SnapshotRequest("ks4",
                                                           "table4",
                                                           "this_is_a_valid_snapshot_name_i_❤_u",
-                                                          false)),
+                                                          false,
+                                                          null)),
                                expectedPath);
 
         expectedPath = table4New.getAbsolutePath()
@@ -723,7 +741,8 @@
                                       new SnapshotRequest("ks4",
                                                           "table4",
                                                           "this_is_a_valid_snapshot_name_i_❤_u",
-                                                          false)),
+                                                          false,
+                                                          null)),
                                expectedPath);
     }