Add Stream SSTable API to Sidecar to stream SSTable components through zero copy streaming

Patch by Saranya Krishnakumar; reviewed by Dinesh Joshi, Yifan Cai for CASSANDRASC-28
diff --git a/.gitignore b/.gitignore
index b00da53..f17cb17 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,7 +5,6 @@
 src/gen-java/
 src/resources/org/apache/cassandra/config/
 logs/
-data/
 conf/hotspot_compiler
 doc/cql3/CQL.html
 
diff --git a/README.md b/README.md
index 17187a6..f7643f6 100644
--- a/README.md
+++ b/README.md
@@ -19,6 +19,11 @@
 
     $ ./gradlew run
   
+Configuring Cassandra Instance
+------------------------------
+
+While setting up cassandra instance, make sure the data directories of cassandra are in the path stored in sidecar.yaml file, else modify data directories path to point to the correct directories for stream APIs to work.
+
 Testing
 ---------
 
diff --git a/build.gradle b/build.gradle
index 8bfc590..836c4dd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -152,7 +152,7 @@
     compile 'io.vertx:vertx-web-client:3.8.5'
 
     compile 'io.swagger.core.v3:swagger-jaxrs2:2.1.0'
-    compile 'org.jboss.resteasy:resteasy-vertx:3.1.0.Final'
+    compile 'org.jboss.resteasy:resteasy-vertx:3.1.2.Final'
     compile group: 'org.jboss.spec.javax.servlet', name: 'jboss-servlet-api_4.0_spec', version: '2.0.0.Final'
 
     // Trying to be exactly compatible with Cassandra's deps
@@ -170,6 +170,7 @@
 
     testCompile "org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
     testCompile "org.junit.jupiter:junit-jupiter-params:${project.junitVersion}"
+    testCompile "org.assertj:assertj-core:3.14.0"
     testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}"
 
     testCompile group: 'org.cassandraunit', name: 'cassandra-unit-shaded', version: '3.11.2.0'
diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml
index f7e1ce3..39dc0a4 100644
--- a/src/main/dist/conf/sidecar.yaml
+++ b/src/main/dist/conf/sidecar.yaml
@@ -5,10 +5,15 @@
 cassandra:
   - host: localhost
   - port: 9042
+  - data_dirs: /cassandra/d1/data, /cassandra/d2/data
 
 sidecar:
   - host: 0.0.0.0
   - port: 9043
+  - throttle:
+    - stream_requests_per_sec: 5000
+    - delay_sec: 5
+    - timeout_sec: 10
 #
 # Enable SSL configuration (Disabled by default)
 #
diff --git a/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java b/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java
new file mode 100644
index 0000000..4a1bd79
--- /dev/null
+++ b/src/main/java/com/google/common/util/concurrent/SidecarRateLimiter.java
@@ -0,0 +1,36 @@
+package com.google.common.util.concurrent;
+
+/**
+ * Wrapper class over guava Rate Limiter, uses SmoothBursty Ratelimiter. This class mainly exists to expose
+ * package protected method queryEarliestAvailable of guava RateLimiter
+ */
+public class SidecarRateLimiter
+{
+    private final RateLimiter rateLimiter;
+
+    private SidecarRateLimiter(final double permitsPerSecond)
+    {
+        this.rateLimiter = RateLimiter.create(permitsPerSecond);
+    }
+
+    public static SidecarRateLimiter create(final double permitsPerSecond)
+    {
+        return new SidecarRateLimiter(permitsPerSecond);
+    }
+
+    /**
+     * Returns earliest time permits will become available
+     */
+    public long queryEarliestAvailable(final long nowMicros)
+    {
+        return this.rateLimiter.queryEarliestAvailable(nowMicros);
+    }
+
+    /**
+     * Tries to reserve 1 permit, if not available immediately returns false
+     */
+    public boolean tryAcquire()
+    {
+        return this.rateLimiter.tryAcquire();
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/Configuration.java b/src/main/java/org/apache/cassandra/sidecar/Configuration.java
index 5f289c5..088f956 100644
--- a/src/main/java/org/apache/cassandra/sidecar/Configuration.java
+++ b/src/main/java/org/apache/cassandra/sidecar/Configuration.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar;
 
+import java.util.Collection;
+import java.util.List;
 import javax.annotation.Nullable;
 
 /**
@@ -31,6 +33,9 @@
     /* Cassandra Port */
     private final Integer cassandraPort;
 
+    /* Cassandra Data Dirs */
+    private Collection<String> cassandraDataDirs;
+
     /* Sidecar's HTTP REST API port */
     private final Integer port;
 
@@ -55,15 +60,22 @@
 
     private final boolean isSslEnabled;
 
-    public Configuration(String cassandraHost, Integer cassandraPort, String host, Integer port,
-                         Integer healthCheckFrequencyMillis, boolean isSslEnabled,
-                         @Nullable String keyStorePath,
-                         @Nullable String keyStorePassword,
-                         @Nullable String trustStorePath,
-                         @Nullable String trustStorePassword)
+    private final long rateLimitStreamRequestsPerSecond;
+
+    private final long throttleTimeoutInSeconds;
+
+    private final long throttleDelayInSeconds;
+
+    public Configuration(String cassandraHost, Integer cassandraPort, List<String> cassandraDataDirs, String host,
+                         Integer port, Integer healthCheckFrequencyMillis, boolean isSslEnabled,
+                         @Nullable String keyStorePath, @Nullable String keyStorePassword,
+                         @Nullable String trustStorePath, @Nullable String trustStorePassword,
+                         long rateLimitStreamRequestsPerSecond, long throttleTimeoutInSeconds,
+                         long throttleDelayInSeconds)
     {
         this.cassandraHost = cassandraHost;
         this.cassandraPort = cassandraPort;
+        this.cassandraDataDirs = cassandraDataDirs;
         this.host = host;
         this.port = port;
         this.healthCheckFrequencyMillis = healthCheckFrequencyMillis;
@@ -73,6 +85,9 @@
         this.trustStorePath = trustStorePath;
         this.trustStorePassword = trustStorePassword;
         this.isSslEnabled = isSslEnabled;
+        this.rateLimitStreamRequestsPerSecond = rateLimitStreamRequestsPerSecond;
+        this.throttleTimeoutInSeconds = throttleTimeoutInSeconds;
+        this.throttleDelayInSeconds = throttleDelayInSeconds;
     }
 
     /**
@@ -96,6 +111,16 @@
     }
 
     /**
+     * Get Cassandra data dirs
+     *
+     * @return
+     */
+    public Collection<String> getCassandraDataDirs()
+    {
+        return cassandraDataDirs;
+    }
+
+    /**
      *  Sidecar's listen address
      *
      * @return
@@ -180,12 +205,33 @@
     }
 
     /**
+     * Get number of stream requests accepted per second
+     *
+     * @return
+     */
+    public long getRateLimitStreamRequestsPerSecond()
+    {
+        return rateLimitStreamRequestsPerSecond;
+    }
+
+    public long getThrottleTimeoutInSeconds()
+    {
+        return throttleTimeoutInSeconds;
+    }
+
+    public long getThrottleDelayInSeconds()
+    {
+        return throttleDelayInSeconds;
+    }
+
+    /**
      * Configuration Builder
      */
     public static class Builder
     {
         private String cassandraHost;
         private Integer cassandraPort;
+        private List<String> cassandraDataDirs;
         private String host;
         private Integer port;
         private Integer healthCheckFrequencyMillis;
@@ -194,6 +240,9 @@
         private String trustStorePath;
         private String trustStorePassword;
         private boolean isSslEnabled;
+        private long rateLimitStreamRequestsPerSecond;
+        private long throttleTimeoutInSeconds;
+        private long throttleDelayInSeconds;
 
         public Builder setCassandraHost(String host)
         {
@@ -207,6 +256,12 @@
             return this;
         }
 
+        public Builder setCassandraDataDirs(List<String> dataDirs)
+        {
+            this.cassandraDataDirs = dataDirs;
+            return this;
+        }
+
         public Builder setHost(String host)
         {
             this.host = host;
@@ -255,10 +310,30 @@
             return this;
         }
 
+        public Builder setRateLimitStreamRequestsPerSecond(long rateLimitStreamRequestsPerSecond)
+        {
+            this.rateLimitStreamRequestsPerSecond = rateLimitStreamRequestsPerSecond;
+            return this;
+        }
+
+        public Builder setThrottleTimeoutInSeconds(long throttleTimeoutInSeconds)
+        {
+            this.throttleTimeoutInSeconds = throttleTimeoutInSeconds;
+            return this;
+        }
+
+        public Builder setThrottleDelayInSeconds(long throttleDelayInSeconds)
+        {
+            this.throttleDelayInSeconds = throttleDelayInSeconds;
+            return this;
+        }
+
         public Configuration build()
         {
-            return new Configuration(cassandraHost, cassandraPort, host, port, healthCheckFrequencyMillis, isSslEnabled,
-                                     keyStorePath, keyStorePassword, trustStorePath, trustStorePassword);
+            return new Configuration(cassandraHost, cassandraPort, cassandraDataDirs, host, port,
+                                     healthCheckFrequencyMillis, isSslEnabled, keyStorePath, keyStorePassword,
+                                     trustStorePath, trustStorePassword, rateLimitStreamRequestsPerSecond,
+                                     throttleTimeoutInSeconds, throttleDelayInSeconds);
         }
     }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
index 27eefd7..223ab97 100644
--- a/src/main/java/org/apache/cassandra/sidecar/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
@@ -23,6 +23,7 @@
 import java.net.MalformedURLException;
 import java.net.URL;
 
+import com.google.common.util.concurrent.SidecarRateLimiter;
 import org.apache.commons.configuration2.YAMLConfiguration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.slf4j.Logger;
@@ -45,7 +46,10 @@
 import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
 import org.apache.cassandra.sidecar.routes.HealthService;
+import org.apache.cassandra.sidecar.routes.StreamSSTableComponent;
 import org.apache.cassandra.sidecar.routes.SwaggerOpenApiResource;
+import org.apache.cassandra.sidecar.utils.CachedFilePathBuilder;
+import org.apache.cassandra.sidecar.utils.FilePathBuilder;
 import org.jboss.resteasy.plugins.server.vertx.VertxRegistry;
 import org.jboss.resteasy.plugins.server.vertx.VertxRequestHandler;
 import org.jboss.resteasy.plugins.server.vertx.VertxResteasyDeployment;
@@ -96,7 +100,9 @@
 
     @Provides
     @Singleton
-    private VertxRequestHandler configureServices(Vertx vertx, HealthService healthService)
+    private VertxRequestHandler configureServices(Vertx vertx,
+                                                  HealthService healthService,
+                                                  StreamSSTableComponent ssTableComponent)
     {
         VertxResteasyDeployment deployment = new VertxResteasyDeployment();
         deployment.start();
@@ -104,6 +110,7 @@
 
         r.addPerInstanceResource(SwaggerOpenApiResource.class);
         r.addSingletonResource(healthService);
+        r.addSingletonResource(ssTableComponent);
 
         return new VertxRequestHandler(vertx, deployment);
     }
@@ -122,7 +129,6 @@
         // Docs index.html page
         StaticHandler docs = StaticHandler.create("docs");
         router.route().path("/docs/*").handler(docs);
-
         return router;
     }
 
@@ -143,6 +149,7 @@
             return new Configuration.Builder()
                     .setCassandraHost(yamlConf.get(String.class, "cassandra.host"))
                     .setCassandraPort(yamlConf.get(Integer.class, "cassandra.port"))
+                    .setCassandraDataDirs(yamlConf.getList(String.class, "cassandra.data_dirs"))
                     .setHost(yamlConf.get(String.class, "sidecar.host"))
                     .setPort(yamlConf.get(Integer.class, "sidecar.port"))
                     .setHealthCheckFrequency(yamlConf.get(Integer.class, "healthcheck.poll_freq_millis"))
@@ -151,6 +158,9 @@
                     .setTrustStorePath(yamlConf.get(String.class, "sidecar.ssl.truststore.path", null))
                     .setTrustStorePassword(yamlConf.get(String.class, "sidecar.ssl.truststore.password", null))
                     .setSslEnabled(yamlConf.get(Boolean.class, "sidecar.ssl.enabled", false))
+                    .setRateLimitStreamRequestsPerSecond(yamlConf.getLong("sidecar.throttle.stream_requests_per_sec"))
+                    .setThrottleTimeoutInSeconds(yamlConf.getLong("sidecar.throttle.timeout_sec"))
+                    .setThrottleDelayInSeconds(yamlConf.getLong("sidecar.throttle.delay_sec"))
                     .build();
         }
         catch (MalformedURLException e)
@@ -184,4 +194,17 @@
     {
         return new CassandraAdapterDelegate(provider, session, config.getHealthCheckFrequencyMillis());
     }
+
+    @Provides
+    public SidecarRateLimiter rateLimiter(Configuration config)
+    {
+        return SidecarRateLimiter.create(config.getRateLimitStreamRequestsPerSecond());
+    }
+
+    @Provides
+    @Singleton
+    public FilePathBuilder filePathBuilder(Configuration config)
+    {
+        return new CachedFilePathBuilder(config.getCassandraDataDirs());
+    }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/exceptions/RangeException.java b/src/main/java/org/apache/cassandra/sidecar/exceptions/RangeException.java
new file mode 100644
index 0000000..8ea135f
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/exceptions/RangeException.java
@@ -0,0 +1,12 @@
+package org.apache.cassandra.sidecar.exceptions;
+
+/**
+ * Custom exception
+ */
+public class RangeException extends RuntimeException
+{
+    public RangeException(String msg)
+    {
+        super(msg);
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
new file mode 100644
index 0000000..708221e
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
@@ -0,0 +1,88 @@
+package org.apache.cassandra.sidecar.models;
+
+import java.io.File;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerResponse;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+
+/**
+ * Wrapper around HttpServerResponse
+ */
+public class HttpResponse
+{
+    private final HttpServerResponse response;
+
+    public HttpResponse(HttpServerResponse response)
+    {
+        this.response = response;
+    }
+
+    public void setRetryAfterHeader(long microsToWait)
+    {
+        response.setStatusCode(HttpResponseStatus.TOO_MANY_REQUESTS.code());
+        response.putHeader(HttpHeaderNames.RETRY_AFTER, Long.toString(MICROSECONDS.toSeconds(microsToWait))).end();
+    }
+
+    public void setTooManyRequestsStatus()
+    {
+        response.setStatusCode(HttpResponseStatus.TOO_MANY_REQUESTS.code()).end();
+    }
+
+    public void setRangeNotSatisfiable(String msg)
+    {
+        response.setStatusCode(HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE.code()).setStatusMessage(msg).end();
+    }
+
+    public void setPartialContentStatus(Range range)
+    {
+        response.setStatusCode(HttpResponseStatus.PARTIAL_CONTENT.code())
+                .putHeader(HttpHeaderNames.CONTENT_RANGE, contentRangeHeader(range));
+    }
+
+    private String contentRangeHeader(Range r)
+    {
+        return "bytes " + r.start() + "-" + r.end() + "/" + r.length();
+    }
+
+    public void setBadRequestStatus(String msg)
+    {
+        response.setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).setStatusMessage(msg).end();
+    }
+
+    public void setNotFoundStatus(String msg)
+    {
+        response.setStatusCode(HttpResponseStatus.NOT_FOUND.code()).setStatusMessage(msg).end();
+    }
+
+    public void setInternalErrorStatus(String msg)
+    {
+        response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).setStatusMessage(msg).end();
+    }
+
+    public void sendFile(File file)
+    {
+        response.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
+                .putHeader(HttpHeaderNames.CONTENT_LENGTH, Long.toString(file.length()))
+                .sendFile(file.getAbsolutePath());
+    }
+
+    public void sendFile(File file, Range range)
+    {
+        if (range.length() != file.length())
+        {
+            setPartialContentStatus(range);
+        }
+        response.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
+                .putHeader(HttpHeaderNames.CONTENT_LENGTH, Long.toString(range.length()))
+                .sendFile(file.getAbsolutePath(), range.start(), range.length());
+    }
+
+    public void setForbiddenStatus(String msg)
+    {
+        response.setStatusCode(HttpResponseStatus.FORBIDDEN.code()).setStatusMessage(msg).end();
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/models/Range.java b/src/main/java/org/apache/cassandra/sidecar/models/Range.java
new file mode 100644
index 0000000..411a2a6
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/models/Range.java
@@ -0,0 +1,182 @@
+package org.apache.cassandra.sidecar.models;
+
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.validation.constraints.NotNull;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.sidecar.exceptions.RangeException;
+
+/**
+ * Accepted Range formats are start-end, start-, -suffix_length
+ * start-end (start = start index of the range, end = end index of the range, both inclusive)
+ * start- (start = start index of the range, end = end of file)
+ * -suffix-length (Requested length from end of file)
+ */
+public class Range
+{
+    private static final Pattern START_END = Pattern.compile("^(\\d+)-(\\d+)$");
+    private static final Pattern PARTIAL = Pattern.compile("^((\\d+)-)$|^(-(\\d+))$");
+    private static final String RANGE_UNIT = "bytes";
+    private final long start;
+    private final long end;
+    private final long length;
+
+    private Range(final long start, final long end)
+    {
+        this.start = start;
+        this.end = end;
+        this.length = length(start, end);
+    }
+
+    public Range(final long start, final long end, final long length)
+    {
+        this.start = start;
+        this.end = end;
+        this.length = length;
+    }
+
+    private long length(final long start, final long end)
+    {
+        if (start == 0 && end == Long.MAX_VALUE) // avoid overflow (extra byte)
+        {
+            return Long.MAX_VALUE;
+        }
+        return end - start + 1;
+    }
+
+    public long start()
+    {
+        return this.start;
+    }
+
+    public long end()
+    {
+        return this.end;
+    }
+
+    public long length()
+    {
+        return this.length;
+    }
+
+    public boolean isValidHttpRange()
+    {
+        return start >= 0 && end >= start && length > 0;
+    }
+
+    public Range intersect(@NotNull final Range range)
+    {
+        if (!(start >= range.start() && start <= range.end()) && !(end >= range.start() && end <= range.end()) &&
+            !(range.start() >= start && range.start() <= end) && !(range.end() >= start && range.end() <= end))
+        {
+            throw new RangeException("Range does not overlap");
+        }
+
+        return new Range(Math.max(start, range.start()), Math.min(end, range.end()));
+    }
+
+    /**
+     * Accepted string formats "start-end", both ends of the range required to be parsed
+     * Sample accepted formats "1-2", "232-2355"
+     */
+    private static Range parseAbsolute(@NotNull String range)
+    {
+        Matcher m = START_END.matcher(range);
+
+        if (!m.matches())
+        {
+            throw new IllegalArgumentException("Supported Range formats are <start>-<end>, <start>-, -<suffix-length>");
+        }
+
+        final long start = Long.parseLong(m.group(1));
+        Preconditions.checkArgument(start >= 0, "Range start can not be negative");
+
+        final long end = Long.parseLong(m.group(2));
+        if (end < start)
+        {
+            throw new RangeException("Range does not satisfy boundary requirements");
+        }
+        return new Range(start, end);
+    }
+
+    public static Range parse(@NotNull String range)
+    {
+        // since file size is not passed, we set it to 0
+        return parse(range, 0);
+    }
+
+    /**
+     * Accepted string formats "1453-3563", "-22344", "5346-"
+     * Sample invalid string formats "8-3", "-", "-0", "a-b"
+     *
+     * @param fileSize - passed in to convert partial range into absolute range
+     */
+    public static Range parse(@NotNull String range, final long fileSize)
+    {
+        Matcher m = PARTIAL.matcher(range);
+        if (!m.matches())
+        {
+            return parseAbsolute(range);
+        }
+
+        Preconditions.checkArgument(fileSize > 0, "Reference file size invalid");
+        if (range.startsWith("-"))
+        {
+            final long length = Long.parseLong(m.group(4));
+            if (length <= 0)
+            {
+                throw new IllegalArgumentException("Suffix length in " + range + " cannot be less than or equal to 0");
+            }
+            Preconditions.checkArgument(length <= fileSize, "Suffix length exceeds");
+            return new Range(fileSize - length, fileSize - 1, length);
+        }
+
+        final long start = Long.parseLong(m.group(2));
+        Preconditions.checkArgument(start >= 0, "Range start can not be negative");
+        Preconditions.checkArgument(start < fileSize, "Range exceeds");
+
+        return new Range(start, fileSize - 1);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (!(o instanceof Range))
+        {
+            return false;
+        }
+        Range range = (Range) o;
+        return start == range.start &&
+                end == range.end &&
+                length == range.length;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(start, end, length);
+    }
+
+    /**
+     * Accepted RangeHeader formats are bytes=start-end, bytes=start-, bytes=-suffix_length
+     */
+    public static Range parseHeader(final String header, final long fileSize)
+    {
+        if (header == null)
+        {
+            return new Range(0, fileSize - 1, fileSize);
+        }
+        if (!header.startsWith(RANGE_UNIT + "="))
+        {
+            throw new UnsupportedOperationException("Unsupported range unit only bytes are allowed");
+        }
+        return Range.parse(header.substring(header.indexOf("=") + 1), fileSize);
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponent.java b/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponent.java
new file mode 100644
index 0000000..7eb3da8
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponent.java
@@ -0,0 +1,99 @@
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.http.HttpServerResponse;
+import org.apache.cassandra.sidecar.models.HttpResponse;
+import org.apache.cassandra.sidecar.models.Range;
+import org.apache.cassandra.sidecar.utils.FilePathBuilder;
+import org.apache.cassandra.sidecar.utils.FileStreamer;
+
+/**
+ * Handler for serving SSTable components from snapshot folders
+ */
+@Singleton
+@javax.ws.rs.Path("/api/v1/stream/keyspace/{keyspace}/table/{table}/snapshot/{snapshot}/component/{component}")
+public class StreamSSTableComponent
+{
+    private static final Pattern REGEX_DIR = Pattern.compile("[a-zA-Z0-9_-]+");
+    private static final Pattern REGEX_COMPONENT = Pattern.compile("[a-zA-Z0-9_-]+(.db|.cql|.json|.crc32|TOC.txt)");
+    private static final Set<String> FORBIDDEN_DIRS = new HashSet<>(
+            Arrays.asList("system_schema", "system_traces", "system_distributed", "system", "system_auth"));
+
+    private final FilePathBuilder pathBuilder;
+    private final FileStreamer fileStreamer;
+
+    @Inject
+    public StreamSSTableComponent(final FilePathBuilder pathBuilder, final FileStreamer fileStreamer)
+    {
+        this.pathBuilder = pathBuilder;
+        this.fileStreamer = fileStreamer;
+    }
+
+    @GET
+    public void handle(@PathParam("keyspace") String keyspace, @PathParam("table") String table,
+                       @PathParam("snapshot") String snapshot, @PathParam("component") String component,
+                       @HeaderParam("Range") String range, @Context HttpServerResponse resp)
+    {
+        final HttpResponse response = new HttpResponse(resp);
+        if (FORBIDDEN_DIRS.contains(keyspace))
+        {
+            response.setForbiddenStatus(keyspace + " keyspace is forbidden");
+            return;
+        }
+        if (!arePathParamsValid(keyspace, table, snapshot, component))
+        {
+            response.setBadRequestStatus("Invalid path params found");
+            return;
+        }
+
+        final Path path;
+        try
+        {
+            path = pathBuilder.build(keyspace, table, snapshot, component);
+        }
+        catch (FileNotFoundException e)
+        {
+            response.setNotFoundStatus(e.getMessage());
+            return;
+        }
+        final File file = path.toFile();
+        final Range r;
+        try
+        {
+            r = parseRangeHeader(range, file.length());
+        }
+        catch (Exception e)
+        {
+            response.setRangeNotSatisfiable(e.getMessage());
+            return;
+        }
+        fileStreamer.stream(response, file, r);
+    }
+
+    private boolean arePathParamsValid(String keyspace, String table, String snapshot, String component)
+    {
+        return REGEX_DIR.matcher(keyspace).matches() && REGEX_DIR.matcher(table).matches()
+                && REGEX_DIR.matcher(snapshot).matches() && REGEX_COMPONENT.matcher(component).matches();
+    }
+
+    private Range parseRangeHeader(String rangeHeader, long fileSize)
+    {
+        final Range fileRange = new Range(0, fileSize - 1, fileSize);
+        // sidecar does not support multiple ranges as of now
+        final Range headerRange = Range.parseHeader(rangeHeader, fileSize);
+        return fileRange.intersect(headerRange);
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/CachedFilePathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/utils/CachedFilePathBuilder.java
new file mode 100644
index 0000000..0c3aabd
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/CachedFilePathBuilder.java
@@ -0,0 +1,219 @@
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.FileNotFoundException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyException;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * Path builder that caches intermediate paths
+ */
+@Singleton
+public class CachedFilePathBuilder extends FilePathBuilder
+{
+    private static final Logger logger = LoggerFactory.getLogger(CachedFilePathBuilder.class);
+    private final CacheLoader<Key, String> loader = new PathCacheLoader();
+    private final LoadingCache<Key, String> sstableCache = getCacheBuilder();
+    private final LoadingCache<Key, String> snapshotCache = getCacheBuilder();
+    private final LoadingCache<Key, String> tableCache = getCacheBuilder();
+    private final LoadingCache<Key, String> keyspaceCache = getCacheBuilder();
+
+    private LoadingCache<Key, String> getCacheBuilder()
+    {
+        return CacheBuilder.newBuilder().maximumSize(10000).refreshAfterWrite(5, TimeUnit.MINUTES).build(loader);
+    }
+
+    @Inject
+    public CachedFilePathBuilder(final Collection<String> dataDirs)
+    {
+        super(dataDirs);
+    }
+
+    public Path build(String keyspace, String table, String snapshot, String component) throws FileNotFoundException
+    {
+        try
+        {
+            return Paths.get(sstableCache.get(new Key.Builder().setKeyspace(keyspace).setTable(table)
+                    .setSnapshot(snapshot).setComponent(component).build()));
+        }
+        catch (Throwable t)
+        {
+            if (ExceptionUtils.getRootCause(t) instanceof FileNotFoundException)
+            {
+                throw (FileNotFoundException) ExceptionUtils.getRootCause(t);
+            }
+            else
+            {
+                logger.error("Unexpected error while building path ", t);
+                throw new RuntimeException("Error loading value from path cache");
+            }
+        }
+    }
+
+    /**
+     * Cache Loader for guava cache storing path to files
+     */
+    public class PathCacheLoader extends CacheLoader<Key, String>
+    {
+        @Override
+        public String load(Key key) throws FileNotFoundException, KeyException, ExecutionException
+        {
+            switch (key.type())
+            {
+                case KEYSPACE_TABLE_SNAPSHOT_COMPONENT:
+                    return addSSTableComponentToPath(key.component(), snapshotCache.get(new Key.Builder()
+                            .setKeyspace(key.keyspace()).setTable(key.table()).setSnapshot(key.snapshot()).build()));
+                case KEYSPACE_TABLE_SNAPSHOT:
+                    return addSnapshotToPath(key.snapshot(), tableCache.get(new Key.Builder()
+                            .setKeyspace(key.keyspace()).setTable(key.table()).build()));
+                case KEYSPACE_TABLE:
+                    return addTableToPath(key.table(), keyspaceCache.get(new Key.Builder().setKeyspace(key.keyspace())
+                            .build()));
+                case JUST_KEYSPACE:
+                    return addKeyspaceToPath(key.keyspace());
+                default:
+                    throw new KeyException();
+            }
+        }
+    }
+
+    /**
+     * Key to retrieve path information from cache
+     */
+    public static class Key
+    {
+        private final String keyspace;
+        private final String table;
+        private final String snapshot;
+        private final String component;
+        private final KeyType type;
+
+        private Key(String keyspace, String table, String snapshot, String component, KeyType type)
+        {
+            this.keyspace = keyspace;
+            this.table = table;
+            this.snapshot = snapshot;
+            this.component = component;
+            this.type = type;
+        }
+
+        public String keyspace() throws KeyException
+        {
+            return Optional.ofNullable(keyspace).orElseThrow(KeyException::new);
+        }
+
+        public String table() throws KeyException
+        {
+            return Optional.ofNullable(table).orElseThrow(KeyException::new);
+        }
+
+        public String snapshot() throws KeyException
+        {
+            return Optional.ofNullable(snapshot).orElseThrow(KeyException::new);
+        }
+
+        public String component() throws KeyException
+        {
+            return Optional.ofNullable(component).orElseThrow(KeyException::new);
+        }
+
+        public KeyType type()
+        {
+            return type;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+            Key key = (Key) o;
+            return type == key.type &&
+                   Objects.equals(keyspace, key.keyspace) &&
+                   Objects.equals(table, key.table) &&
+                   Objects.equals(snapshot, key.snapshot) &&
+                   Objects.equals(component, key.component);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(keyspace, table, snapshot, component, type);
+        }
+
+        /**
+         * Builder class for Key
+         */
+        public static class Builder
+        {
+            private String keyspace;
+            private String table;
+            private String snapshot;
+            private String component;
+            private int length;
+
+            public Builder setKeyspace(String keyspace)
+            {
+                length++;
+                this.keyspace = keyspace;
+                return this;
+            }
+
+            public Builder setTable(String table)
+            {
+                length++;
+                this.table = table;
+                return this;
+            }
+
+            public Builder setSnapshot(String snapshot)
+            {
+                length++;
+                this.snapshot = snapshot;
+                return this;
+            }
+
+            public Builder setComponent(String component)
+            {
+                length++;
+                this.component = component;
+                return this;
+            }
+
+            public CachedFilePathBuilder.Key build()
+            {
+                return new CachedFilePathBuilder.Key(keyspace, table, snapshot, component,
+                        KeyType.values()[length - 1]);
+            }
+        }
+    }
+
+    /**
+     * Enum to hold types of keys created
+     */
+    public enum KeyType
+    {
+        JUST_KEYSPACE, KEYSPACE_TABLE, KEYSPACE_TABLE_SNAPSHOT, KEYSPACE_TABLE_SNAPSHOT_COMPONENT
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/FilePathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/utils/FilePathBuilder.java
new file mode 100644
index 0000000..334f58e
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/FilePathBuilder.java
@@ -0,0 +1,145 @@
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Builds file path after verifying it exists
+ */
+public abstract class FilePathBuilder
+{
+    private static final Logger logger = LoggerFactory.getLogger(FilePathBuilder.class);
+    private static final String dataSubDir = "/data";
+    private final Collection<String> dataDirs;
+
+    public FilePathBuilder(@NotNull final Collection<String> dataDirs)
+    {
+        this.dataDirs = dataDirs;
+    }
+
+    public abstract Path build(String keyspace, String table, String snapshot, String component)
+            throws FileNotFoundException;
+
+    String addKeyspaceToPath(String keyspace) throws FileNotFoundException
+    {
+        for (String dir : dataDirs)
+        {
+            StringBuilder path = new StringBuilder(dir);
+            if (addFileToPathIfPresent(path, keyspace, true))
+            {
+                return path.toString();
+            }
+
+            if (dir.endsWith(dataSubDir))
+            {
+                continue;
+            }
+
+            // check in "data" sub directory
+            if (addFileToPathIfPresent(path.append(dataSubDir), keyspace, true))
+            {
+                return path.toString();
+            }
+        }
+        throw new FileNotFoundException("Keyspace " + keyspace + " does not exist");
+    }
+
+    String addTableToPath(String table, String path) throws FileNotFoundException
+    {
+        final StringBuilder modifiedPath = new StringBuilder(path);
+        if (addFileToPathIfPresent(modifiedPath, table, false))
+        {
+            return modifiedPath.toString();
+        }
+        throw new FileNotFoundException("Table " + table + " not found, path searched: " + path);
+    }
+
+    String addSnapshotToPath(String snapshot, String path) throws FileNotFoundException
+    {
+        final StringBuilder modifiedPath = new StringBuilder(path);
+        if (addFileToPathIfPresent(modifiedPath.append("/snapshots"), snapshot, true))
+        {
+            return modifiedPath.toString();
+        }
+        throw new FileNotFoundException("Snapshot " + snapshot + " not found, path searched: " + path);
+    }
+
+    String addSSTableComponentToPath(String component, String path) throws FileNotFoundException
+    {
+        final StringBuilder modifiedPath = new StringBuilder(path);
+        if (addFileToPathIfPresent(modifiedPath, component, true))
+        {
+            return modifiedPath.toString();
+        }
+        throw new FileNotFoundException("Component " + component + " not found, path searched: " + path);
+    }
+
+    private boolean addFileToPathIfPresent(StringBuilder path, String file, boolean checkEqual)
+            throws FileNotFoundException
+    {
+        final Path fileDir = Paths.get(path.toString());
+        if (!checkDirExists(fileDir))
+        {
+            throw new FileNotFoundException(fileDir + " directory empty or does not exist!");
+        }
+
+        try
+        {
+            Path finalPath = null;
+            try (final DirectoryStream<Path> dirEntries = Files.newDirectoryStream(fileDir))
+            {
+                for (Path entry : dirEntries)
+                {
+                    final Path filePath = entry.getFileName();
+                    if (filePath == null)
+                    {
+                        continue;
+                    }
+                    final String fileName = filePath.toString();
+                    if (fileName.equals(file) || (!checkEqual && fileName.startsWith(file + "-")))
+                    {
+                        if (finalPath == null
+                                || Files.getLastModifiedTime(entry).compareTo(Files.getLastModifiedTime(finalPath)) > 0)
+                        {
+                            finalPath = entry;
+                        }
+                    }
+                }
+                if (finalPath != null)
+                {
+                    final Path finalFilePath = finalPath.getFileName();
+                    if (finalFilePath == null)
+                    {
+                        return false;
+                    }
+                    final String finalFileName = finalFilePath.toString();
+                    path.append('/').append(finalFileName);
+                    return true;
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            logger.error("Error listing files in path {}, could not add file {} to path", path, file, e);
+            throw new RuntimeException("Failed to list files in path " + path);
+        }
+        return false;
+    }
+
+    private boolean checkDirExists(final Path path)
+    {
+        final File file = new File(path.toString());
+        return file.exists() && file.isDirectory();
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
new file mode 100644
index 0000000..b90f231
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
@@ -0,0 +1,113 @@
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.File;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.SidecarRateLimiter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.models.HttpResponse;
+import org.apache.cassandra.sidecar.models.Range;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+
+/**
+ * General handler for serving files
+ */
+@Singleton
+public class FileStreamer
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(FileStreamer.class);
+    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setNameFormat("acquirePermit").setDaemon(true).build());
+    private final Duration delay;
+    private final Duration timeout;
+
+    private final SidecarRateLimiter rateLimiter;
+
+    @Inject
+    public FileStreamer(Configuration config, SidecarRateLimiter rateLimiter)
+    {
+        this.rateLimiter = rateLimiter;
+        this.delay = Duration.ofSeconds(config.getThrottleDelayInSeconds());
+        this.timeout = Duration.ofSeconds(config.getThrottleTimeoutInSeconds());
+    }
+
+    public void stream(final HttpResponse resp, final File file)
+    {
+        stream(resp, file, new Range(0, file.length() - 1, file.length()));
+    }
+
+    public void stream(final HttpResponse resp, final File file, final Range range)
+    {
+        if (!file.exists() || !file.isFile())
+        {
+            resp.setNotFoundStatus("File does not exist or it is not a normal file");
+            return;
+        }
+        if (file.length() == 0)
+        {
+            resp.setBadRequestStatus("File is empty");
+            return;
+        }
+        acquireAndSend(resp, file, range);
+    }
+
+    private void acquireAndSend(HttpResponse response, File file, Range range)
+    {
+        acquireAndSend(response, file, range, Instant.now());
+    }
+
+    /**
+     * If permit becomes available within a short time, retry immediately
+     */
+    private void acquireAndSend(HttpResponse response, File file, Range range, Instant startTime)
+    {
+        while (!rateLimiter.tryAcquire())
+        {
+            if (checkRetriesExhausted(startTime))
+            {
+                LOGGER.error("Retries for acquiring permit exhausted!");
+                response.setTooManyRequestsStatus();
+                return;
+            }
+
+            final long microsToWait = rateLimiter.queryEarliestAvailable(0L);
+            if (microsToWait <= 0) // immediately retry
+            {
+                continue;
+            }
+
+            if (TimeUnit.MICROSECONDS.toNanos(microsToWait) >= delay.getNano())
+            {
+                response.setRetryAfterHeader(microsToWait);
+            }
+            else
+            {
+                retryStreaming(response, file, range, startTime, microsToWait);
+            }
+            return;
+        }
+        LOGGER.info("File {} streamed from path {}", file.getName(), file.getAbsolutePath());
+        response.sendFile(file, range);
+    }
+
+    private boolean checkRetriesExhausted(Instant startTime)
+    {
+        return startTime.plus(timeout).isBefore(Instant.now());
+    }
+
+    private void retryStreaming(HttpResponse response, File file, Range range, Instant startTime, long microsToSleep)
+    {
+        SCHEDULER.schedule(() -> acquireAndSend(response, file, range, startTime), microsToSleep, MICROSECONDS);
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/FilePathBuilderTest.java b/src/test/java/org/apache/cassandra/sidecar/FilePathBuilderTest.java
new file mode 100644
index 0000000..734ca22
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/FilePathBuilderTest.java
@@ -0,0 +1,126 @@
+package org.apache.cassandra.sidecar;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Modules;
+import org.apache.cassandra.sidecar.utils.CachedFilePathBuilder;
+import org.apache.cassandra.sidecar.utils.FilePathBuilder;
+import org.assertj.core.api.Assertions;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * FilePathBuilderTest
+ */
+public class FilePathBuilderTest
+{
+    private static final String expectedFilePath = "src/test/resources/data/TestKeyspace" +
+            "/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots/TestSnapshot" +
+            "/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+    private static FilePathBuilder pathBuilder;
+
+    @BeforeAll
+    public static void setUp()
+    {
+        Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(new TestModule()));
+        pathBuilder = injector.getInstance(FilePathBuilder.class);
+    }
+
+    @Test
+    public void testRoute() throws IOException
+    {
+        final String keyspace = "TestKeyspace";
+        final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
+        final String snapshot = "TestSnapshot";
+        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        Path filePath = pathBuilder.build(keyspace, table, snapshot, component);
+        assertEquals(expectedFilePath, filePath.toString());
+    }
+
+    @Test
+    public void testKeyspaceNotFound()
+    {
+        final String keyspace = "random";
+        final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
+        final String snapshot = "TestSnapshot";
+        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
+        {
+            pathBuilder.build(keyspace, table, snapshot, component);
+        });
+        String msg = "Keyspace random does not exist";
+        assertEquals(msg, thrownException.getMessage());
+    }
+
+    @Test
+    public void testTableNotFound()
+    {
+        final String keyspace = "TestKeyspace";
+        final String table = "random";
+        final String snapshot = "TestSnapshot";
+        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
+        {
+            pathBuilder.build(keyspace, table, snapshot, component);
+        });
+        String msg = "Table random not found, path searched: src/test/resources/data/TestKeyspace";
+        assertEquals(msg, thrownException.getMessage());
+    }
+
+    @Test
+    public void testSnapshotNotFound()
+    {
+        final String keyspace = "TestKeyspace";
+        final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
+        final String snapshot = "random";
+        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
+        {
+            pathBuilder.build(keyspace, table, snapshot, component);
+        });
+        String msg = "Snapshot random not found, path searched: src/test/resources/data/TestKeyspace" +
+                     "/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
+        assertEquals(msg, thrownException.getMessage());
+    }
+
+    @Test
+    public void testPartialTableName() throws FileNotFoundException
+    {
+        final String keyspace = "TestKeyspace";
+        final String table = "TestTable";
+        final String snapshot = "TestSnapshot";
+        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        Path filePath = pathBuilder.build(keyspace, table, snapshot, component);
+        assertEquals(expectedFilePath, filePath.toString());
+    }
+
+    @Test
+    public void testEmptyDataDir() throws IOException
+    {
+        String dataDir = new File("./").getCanonicalPath() + "/src/test/resources/instance";
+
+        final String keyspace = "TestKeyspace";
+        final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
+        final String snapshot = "TestSnapshot";
+        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+
+        FilePathBuilder pathBuilder = new CachedFilePathBuilder(Collections.singletonList(dataDir));
+        FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
+        {
+            pathBuilder.build(keyspace, table, snapshot, component);
+        });
+        String msg = "directory empty or does not exist!";
+        Assertions.assertThat(thrownException.getMessage()).contains(msg);
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/RangeTest.java b/src/test/java/org/apache/cassandra/sidecar/RangeTest.java
new file mode 100644
index 0000000..82c5d5e
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/RangeTest.java
@@ -0,0 +1,86 @@
+package org.apache.cassandra.sidecar;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.sidecar.exceptions.RangeException;
+import org.apache.cassandra.sidecar.models.Range;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * RangeTest
+ */
+public class RangeTest
+{
+    @Test
+    public void testValidPartialRange()
+    {
+        final String rangeHeaderVal = "bytes=2-";
+        final Range range = Range.parseHeader(rangeHeaderVal, 5);
+        assertEquals(3, range.length());
+        assertEquals(2, range.start());
+        assertEquals(4, range.end());
+        assertTrue(range.isValidHttpRange());
+    }
+
+    @Test
+    public void testValidFullRange()
+    {
+        final String rangeHeaderVal = "bytes=0-100";
+        final Range range = Range.parseHeader(rangeHeaderVal, 500);
+        assertEquals(101, range.length());
+        assertEquals(0, range.start());
+        assertEquals(100, range.end());
+        assertTrue(range.isValidHttpRange());
+    }
+
+    @Test
+    public void testInvalidRangeFormat()
+    {
+        final String rangeVal = "2344--3432";
+        IllegalArgumentException thrownException = assertThrows(IllegalArgumentException.class, () ->
+        {
+            Range.parse(rangeVal);
+        });
+        String msg = "Supported Range formats are <start>-<end>, <start>-, -<suffix-length>";
+        assertEquals(msg, thrownException.getMessage());
+    }
+
+    @Test
+    public void testInvalidSuffixLength()
+    {
+        final String rangeVal = "-0";
+        IllegalArgumentException thrownException = assertThrows(IllegalArgumentException.class, () ->
+        {
+            Range.parse(rangeVal, Long.MAX_VALUE);
+        });
+        String msg = "Suffix length in -0 cannot be less than or equal to 0";
+        assertEquals(msg, thrownException.getMessage());
+    }
+
+    @Test
+    public void testInvalidRangeBoundary()
+    {
+        final String rangeVal = "9-2";
+        RangeException thrownException = assertThrows(RangeException.class, () ->
+        {
+            Range.parse(rangeVal);
+        });
+        String msg = "Range does not satisfy boundary requirements";
+        assertEquals(msg, thrownException.getMessage());
+    }
+
+    @Test
+    public void testWrongRangeUnitUsed()
+    {
+        final String rangeVal = "bits=0-";
+        UnsupportedOperationException thrownException = assertThrows(UnsupportedOperationException.class, () ->
+        {
+            Range.parseHeader(rangeVal, 5);
+        });
+        String msg = "Unsupported range unit only bytes are allowed";
+        assertEquals(msg, thrownException.getMessage());
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/StreamSSTableComponentTest.java b/src/test/java/org/apache/cassandra/sidecar/StreamSSTableComponentTest.java
new file mode 100644
index 0000000..b6cc605
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/StreamSSTableComponentTest.java
@@ -0,0 +1,308 @@
+package org.apache.cassandra.sidecar;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for StreamSSTableComponent
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamSSTableComponentTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(StreamSSTableComponentTest.class);
+    private Vertx vertx;
+    private HttpServer server;
+    private Configuration config;
+
+    @BeforeEach
+    void setUp() throws InterruptedException
+    {
+        Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(new TestModule()));
+        server = injector.getInstance(HttpServer.class);
+        vertx = injector.getInstance(Vertx.class);
+        config = injector.getInstance(Configuration.class);
+
+        VertxTestContext context = new VertxTestContext();
+        server.listen(config.getPort(), context.completing());
+
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void tearDown() throws InterruptedException
+    {
+        final CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close(res -> closeLatch.countDown());
+        vertx.close();
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            logger.info("Close event received before timeout.");
+        else
+            logger.error("Close event timed out.");
+    }
+
+    @Test
+    void testRoute(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+                .as(BodyCodec.buffer())
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(200, response.statusCode());
+                    assertEquals("data", response.bodyAsString());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testKeyspaceNotFound(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/random/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(404, response.statusCode());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testSnapshotNotFound(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/random/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(404, response.statusCode());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testForbiddenKeyspace(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/system/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(403, response.statusCode());
+                    assertEquals("system keyspace is forbidden", response.statusMessage());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testIncorrectKeyspaceFormat(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/k*s/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(400, response.statusCode());
+                    assertEquals("Invalid path params found", response.statusMessage());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testIncorrectComponentFormat(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data...db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(400, response.statusCode());
+                    assertEquals("Invalid path params found", response.statusMessage());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testAccessDeniedToCertainComponents(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Digest.crc32d";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(400, response.statusCode());
+                    assertEquals("Invalid path params found", response.statusMessage());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testPartialTableName(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable/snapshot/TestSnapshot/component" +
+                "/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .putHeader("Range", "bytes=0-")
+                .as(BodyCodec.buffer())
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(200, response.statusCode());
+                    assertEquals("data", response.bodyAsString());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testInvalidRange(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .putHeader("Range", "bytes=4-3")
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(416, response.statusCode());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testRangeExceeds(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .putHeader("Range", "bytes=5-9")
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(416, response.statusCode());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testPartialRangeExceeds(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .putHeader("Range", "bytes=5-")
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(416, response.statusCode());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testRangeBoundaryExceeds(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .putHeader("Range", "bytes=0-999999")
+                .as(BodyCodec.buffer())
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(200, response.statusCode());
+                    assertEquals("data", response.bodyAsString());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testPartialRangeStreamed(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .putHeader("Range", "bytes=0-2") // 3 bytes streamed
+                .as(BodyCodec.buffer())
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(206, response.statusCode());
+                    assertEquals("dat", response.bodyAsString());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testSuffixRange(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .putHeader("Range", "bytes=-2") // last 2 bytes streamed
+                .as(BodyCodec.buffer())
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(206, response.statusCode());
+                    assertEquals("ta", response.bodyAsString());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testSuffixRangeExceeds(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .putHeader("Range", "bytes=-5")
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(416, response.statusCode());
+                    context.completeNow();
+                })));
+    }
+
+    @Test
+    void testInvalidRangeUnit(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+        client.get(config.getPort(), config.getHost(), "/api/v1/stream" + testRoute)
+                .putHeader("Range", "bits=0-2")
+                .send(context.succeeding(response -> context.verify(() ->
+                {
+                    assertEquals(416, response.statusCode());
+                    context.completeNow();
+                })));
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 67c28d0..755ad4a 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -18,13 +18,14 @@
 
 package org.apache.cassandra.sidecar;
 
+import java.util.Collections;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
-
 import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
 import org.apache.cassandra.sidecar.common.MockCassandraFactory;
@@ -66,10 +67,14 @@
         return new Configuration.Builder()
                            .setCassandraHost("INVALID_FOR_TEST")
                            .setCassandraPort(0)
+                           .setCassandraDataDirs(Collections.singletonList("src/test/resources/data"))
                            .setHost("127.0.0.1")
                            .setPort(6475)
                            .setHealthCheckFrequency(1000)
                            .setSslEnabled(false)
+                           .setRateLimitStreamRequestsPerSecond(1)
+                           .setThrottleDelayInSeconds(5)
+                           .setThrottleTimeoutInSeconds(10)
                            .build();
     }
 
@@ -85,5 +90,4 @@
         builder.add(new MockCassandraFactory());
         return builder.build();
     }
-
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestSslModule.java b/src/test/java/org/apache/cassandra/sidecar/TestSslModule.java
index 883e04e..4ecb9f2 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestSslModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestSslModule.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar;
 
 import java.io.File;
+import java.util.Collections;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +53,7 @@
         return new Configuration.Builder()
                            .setCassandraHost("INVALID_FOR_TEST")
                            .setCassandraPort(0)
+                           .setCassandraDataDirs(Collections.singletonList("src/test/resources/data"))
                            .setHost("127.0.0.1")
                            .setPort(6475)
                            .setHealthCheckFrequency(1000)
@@ -60,6 +62,7 @@
                            .setTrustStorePath(trustStorePath)
                            .setTrustStorePassword(trustStorePassword)
                            .setSslEnabled(true)
+                           .setRateLimitStreamRequestsPerSecond(1)
                            .build();
     }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
new file mode 100644
index 0000000..d77c8e5
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
@@ -0,0 +1,109 @@
+package org.apache.cassandra.sidecar;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Modules;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test rate limiting stream requests
+ */
+@ExtendWith(VertxExtension.class)
+public class ThrottleTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(ThrottleTest.class);
+    private Vertx vertx;
+    private HttpServer server;
+    private Configuration config;
+
+    @BeforeEach
+    void setUp() throws InterruptedException
+    {
+
+        Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(new TestModule()));
+        server = injector.getInstance(HttpServer.class);
+        vertx = injector.getInstance(Vertx.class);
+        config = injector.getInstance(Configuration.class);
+
+        VertxTestContext context = new VertxTestContext();
+        server.listen(config.getPort(), context.completing());
+
+        context.awaitCompletion(5, SECONDS);
+    }
+
+    @AfterEach
+    void tearDown() throws InterruptedException
+    {
+        final CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close(res -> closeLatch.countDown());
+        vertx.close();
+        if (closeLatch.await(60, SECONDS))
+            logger.info("Close event received before timeout.");
+        else
+            logger.error("Close event timed out.");
+    }
+
+    @Test
+    void testStreamRequestsThrottled() throws Exception
+    {
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+                "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
+
+        for (int i = 0; i < 20; i++)
+        {
+            unblockingClientRequest(testRoute);
+        }
+
+        HttpResponse response = blockingClientRequest(testRoute);
+        assertEquals(HttpResponseStatus.TOO_MANY_REQUESTS.code(), response.statusCode());
+
+        long secsToWait = Long.parseLong(response.getHeader("Retry-After"));
+        Thread.sleep(SECONDS.toMillis(secsToWait));
+
+        HttpResponse finalResp = blockingClientRequest(testRoute);
+        assertEquals(HttpResponseStatus.OK.code(), finalResp.statusCode());
+        assertEquals("data", finalResp.bodyAsString());
+    }
+
+    private void unblockingClientRequest(String route)
+    {
+        WebClient client = WebClient.create(vertx);
+        client.get(config.getPort(), "localhost", "/api/v1/stream" + route)
+                .as(BodyCodec.buffer())
+                .send(resp ->
+                {
+                    // do nothing
+                });
+    }
+
+    private HttpResponse blockingClientRequest(String route) throws ExecutionException, InterruptedException
+    {
+        WebClient client = WebClient.create(vertx);
+        CompletableFuture<HttpResponse> future = new CompletableFuture<>();
+        client.get(config.getPort(), "localhost", "/api/v1/stream" + route)
+                .as(BodyCodec.buffer())
+                .send(resp -> future.complete(resp.result()));
+        return future.get();
+    }
+}
diff --git a/src/test/resources/data/TestKeyspace/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots/TestSnapshot/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db b/src/test/resources/data/TestKeyspace/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots/TestSnapshot/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db
new file mode 100644
index 0000000..6320cd2
--- /dev/null
+++ b/src/test/resources/data/TestKeyspace/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots/TestSnapshot/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db
@@ -0,0 +1 @@
+data
\ No newline at end of file