[BAHIR-213] Faster S3 file Source for Structured Streaming with SQS (#91)

Using FileStreamSource to read files from a S3 bucket has problems 
both in terms of costs and latency:

Latency: Listing all the files in S3 buckets every micro-batch can be both
slow and resource-intensive.

Costs: Making List API requests to S3 every micro-batch can be costly.

The solution is to use Amazon Simple Queue Service (SQS) which lets 
you find new files written to S3 bucket without the need to list all the 
files every micro-batch.

S3 buckets can be configured to send a notification to an Amazon SQS Queue
on Object Create / Object Delete events. For details see AWS documentation
here Configuring S3 Event Notifications

Spark can leverage this to find new files written to S3 bucket by reading 
notifications from SQS queue instead of listing files every micro-batch.

This PR adds a new SQSSource which uses Amazon SQS queue to find 
new files every micro-batch.

Usage

val inputDf = spark .readStream
   .format("s3-sqs")
   .schema(schema)
   .option("fileFormat", "json")
   .option("sqsUrl", "https://QUEUE_URL")
   .option("region", "us-east-1")
   .load()
diff --git a/pom.xml b/pom.xml
index eee1d78..6988e39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
     <module>sql-streaming-akka</module>
     <module>sql-streaming-mqtt</module>
     <module>sql-streaming-jdbc</module>
+    <module>sql-streaming-sqs</module>
     <module>streaming-akka</module>
     <module>streaming-mqtt</module>
     <module>streaming-pubnub</module>
diff --git a/sql-streaming-sqs/README.md b/sql-streaming-sqs/README.md
new file mode 100644
index 0000000..c59ff5d
--- /dev/null
+++ b/sql-streaming-sqs/README.md
@@ -0,0 +1,59 @@
+A library for reading data from Amzon S3 with optimised listing using Amazon SQS using Spark SQL Streaming ( or Structured streaming.). 
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-sqs" % "{{site.SPARK_VERSION}}"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-streaming-akka_{{site.SCALA_BINARY_VERSION}}</artifactId>
+        <version>{{site.SPARK_VERSION}}</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-sqs_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}}
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.12 only, and intends to support Spark 2.4.0 onwards.
+
+## Configuration options
+The configuration is obtained from parameters.
+
+Name |Default | Meaning
+--- |:---:| ---
+sqsUrl|required, no default value|sqs queue url, like 'https://sqs.us-east-1.amazonaws.com/330183209093/TestQueue'
+region|required, no default value|AWS region where queue is created
+fileFormat|required, no default value|file format for the s3 files stored on Amazon S3
+schema|required, no default value|schema of the data being read 
+sqsFetchIntervalSeconds|10|time interval (in seconds) after which to fetch messages from Amazon SQS queue
+sqsLongPollingWaitTimeSeconds|20|wait time (in seconds) for long polling on Amazon SQS queue 
+sqsMaxConnections|1|number of parallel threads to connect to Amazon SQS queue
+sqsMaxRetries|10|Maximum number of consecutive retries in case of a connection failure to SQS before giving up
+ignoreFileDeletion|false|whether to ignore any File deleted message in SQS queue
+fileNameOnly|false|Whether to check new files based on only the filename instead of on the full path
+shouldSortFiles|true|whether to sort files based on timestamp while listing them from SQS
+useInstanceProfileCredentials|false|Whether to use EC2 instance profile credentials for connecting to Amazon SQS
+maxFilesPerTrigger|no default value|maximum number of files to process in a microbatch
+maxFileAge|7d|Maximum age of a file that can be found in this directory
+
+## Example
+
+An example to create a SQL stream which uses Amazon SQS to list files on S3,
+
+        val inputDf = sparkSession
+                          .readStream
+                          .format("s3-sqs")
+                          .schema(schema)
+                          .option("sqsUrl", queueUrl)
+                          .option("fileFormat", "json")
+                          .option("sqsFetchIntervalSeconds", "2")
+                          .option("sqsLongPollingWaitTimeSeconds", "5")
+                          .load()
diff --git a/sql-streaming-sqs/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/sqs/SqsSourceExample.scala b/sql-streaming-sqs/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/sqs/SqsSourceExample.scala
new file mode 100644
index 0000000..9686651
--- /dev/null
+++ b/sql-streaming-sqs/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/sqs/SqsSourceExample.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.examples.sql.streaming.sqs
+
+import scala.util.Random
+
+import org.apache.spark.sql.SparkSession
+
+ /**
+  * Example to read files from S3 using SQS Source and write results to Memory Sink
+  *
+  * Usage: SqsSourceExample <Sample Record Path to infer schema> <SQS Queue URL> <File Format>
+  */
+
+object SqsSourceExample {
+
+  def main(args: Array[String]) {
+
+    val randomName = Random.alphanumeric.take(6).mkString("")
+    val pathName = "path_" + randomName
+    val queryName = "query_" + randomName
+    val checkpointDir = s"/checkpoints/$pathName"
+    val schemaPathString = args(0)
+
+    val spark = SparkSession.builder().appName("SqsExample").getOrCreate()
+
+    val schema = spark.read.json(schemaPathString).schema
+
+    val queueUrl = args(1)
+
+    val fileFormat = args(2)
+
+    val inputDf = spark
+      .readStream
+      .format("s3-sqs")
+      .schema(schema)
+      .option("sqsUrl", queueUrl)
+      .option("fileFormat", fileFormat)
+      .option("sqsFetchIntervalSeconds", "2")
+      .option("sqsLongPollingWaitTimeSeconds", "5")
+      .option("maxFilesPerTrigger", "50")
+      .option("ignoreFileDeletion", "true")
+      .load()
+
+    val query = inputDf
+      .writeStream
+      .queryName(queryName)
+      .format("memory")
+      .option("checkpointLocation", checkpointDir)
+      .start()
+
+    query.awaitTermination()
+  }
+}
+
+
+
+
diff --git a/sql-streaming-sqs/pom.xml b/sql-streaming-sqs/pom.xml
new file mode 100644
index 0000000..40baed9
--- /dev/null
+++ b/sql-streaming-sqs/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-parent_2.12</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>spark-sql-streaming-sqs_2.12</artifactId>
+  <properties>
+    <sbt.project.name>sql-streaming-sqs</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Apache Bahir - Spark SQL Streaming SQS</name>
+  <url>http://bahir.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-sqs</artifactId>
+      <version>1.11.271</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-shade-plugin</artifactId>
+          <version>3.1.0</version>
+          <executions>
+            <execution>
+              <phase>package</phase>
+              <goals>
+                <goal>shade</goal>
+              </goals>
+              <configuration>
+                <artifactSet>
+                  <includes>
+                    <include>com.amazonaws:aws-java-sdk-sqs:*</include>
+                    <include>com.amazonaws:aws-java-sdk-core:*</include>
+                  </includes>
+                </artifactSet>
+                <filters>
+                  <filter>
+                    <artifact>*:*</artifact>
+                    <excludes>
+                      <exclude>META-INF/maven/**</exclude>
+                      <exclude>META-INF/MANIFEST.MF</exclude>
+                    </excludes>
+                  </filter>
+                </filters>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+      </plugins>        
+    </pluginManagement>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>
diff --git a/sql-streaming-sqs/src/main/java/org/apache/spark/sql/streaming/sqs/BasicAWSCredentialsProvider.java b/sql-streaming-sqs/src/main/java/org/apache/spark/sql/streaming/sqs/BasicAWSCredentialsProvider.java
new file mode 100644
index 0000000..4c3447c
--- /dev/null
+++ b/sql-streaming-sqs/src/main/java/org/apache/spark/sql/streaming/sqs/BasicAWSCredentialsProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.sqs;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.AWSCredentials;
+import org.apache.commons.lang.StringUtils;
+
+public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
+  private final String accessKey;
+  private final String secretKey;
+
+  public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
+    this.accessKey = accessKey;
+    this.secretKey = secretKey;
+  }
+
+  public AWSCredentials getCredentials() {
+    if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
+      return new BasicAWSCredentials(accessKey, secretKey);
+    }
+    throw new AmazonClientException(
+        "Access key or secret key is null");
+  }
+
+  public void refresh() {}
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+}
diff --git a/sql-streaming-sqs/src/main/java/org/apache/spark/sql/streaming/sqs/InstanceProfileCredentialsProviderWithRetries.java b/sql-streaming-sqs/src/main/java/org/apache/spark/sql/streaming/sqs/InstanceProfileCredentialsProviderWithRetries.java
new file mode 100644
index 0000000..8ef4fae
--- /dev/null
+++ b/sql-streaming-sqs/src/main/java/org/apache/spark/sql/streaming/sqs/InstanceProfileCredentialsProviderWithRetries.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.sqs;
+
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class InstanceProfileCredentialsProviderWithRetries
+        extends InstanceProfileCredentialsProvider {
+
+    private static final Log LOG = LogFactory.getLog(
+            InstanceProfileCredentialsProviderWithRetries.class);
+
+    public AWSCredentials getCredentials() {
+        int retries = 10;
+        int sleep = 500;
+        while(retries > 0) {
+            try {
+                return super.getCredentials();
+            }
+            catch (RuntimeException re) {
+                LOG.error("Got an exception while fetching credentials " + re);
+                --retries;
+                try {
+                    Thread.sleep(sleep);
+                } catch (InterruptedException ie) {
+                    // Do nothing
+                }
+                if (sleep < 10000) {
+                    sleep *= 2;
+                }
+            }
+            catch (Error error) {
+                LOG.error("Got an exception while fetching credentials " + error);
+                --retries;
+                try {
+                    Thread.sleep(sleep);
+                } catch (InterruptedException ie) {
+                    // Do nothing
+                }
+                if (sleep < 10000) {
+                    sleep *= 2;
+                }
+            }
+        }
+        throw new AmazonClientException("Unable to load credentials.");
+    }
+}
diff --git a/sql-streaming-sqs/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql-streaming-sqs/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..953ca18
--- /dev/null
+++ b/sql-streaming-sqs/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.spark.sql.streaming.sqs.SqsSourceProvider
\ No newline at end of file
diff --git a/sql-streaming-sqs/src/main/resources/log4j.properties b/sql-streaming-sqs/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e450494
--- /dev/null
+++ b/sql-streaming-sqs/src/main/resources/log4j.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=WARN, console
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.sparkproject.jetty=WARN
+log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
diff --git a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
new file mode 100644
index 0000000..04bfb09
--- /dev/null
+++ b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.sqs
+
+import java.text.SimpleDateFormat
+import java.util.TimeZone
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import com.amazonaws.{AmazonClientException, AmazonServiceException, ClientConfiguration}
+import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
+import com.amazonaws.services.sqs.model.{DeleteMessageBatchRequestEntry, Message, ReceiveMessageRequest}
+import org.apache.hadoop.conf.Configuration
+import org.json4s.{DefaultFormats, MappingException}
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods.parse
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+class SqsClient(sourceOptions: SqsSourceOptions,
+                hadoopConf: Configuration) extends Logging {
+
+  private val sqsFetchIntervalSeconds = sourceOptions.fetchIntervalSeconds
+  private val sqsLongPollWaitTimeSeconds = sourceOptions.longPollWaitTimeSeconds
+  private val sqsMaxRetries = sourceOptions.maxRetries
+  private val maxConnections = sourceOptions.maxConnections
+  private val ignoreFileDeletion = sourceOptions.ignoreFileDeletion
+  private val region = sourceOptions.region
+  val sqsUrl = sourceOptions.sqsUrl
+
+  @volatile var exception: Option[Exception] = None
+
+  private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
+  private var retriesOnFailure = 0
+  private val sqsClient = createSqsClient()
+
+  val sqsScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("sqs-scheduler")
+
+  val sqsFileCache = new SqsFileCache(sourceOptions.maxFileAgeMs, sourceOptions.fileNameOnly)
+
+  val deleteMessageQueue = new java.util.concurrent.ConcurrentLinkedQueue[String]()
+
+  private val sqsFetchMessagesThread = new Runnable {
+    override def run(): Unit = {
+      try {
+        // Fetching messages from Amazon SQS
+        val newMessages = sqsFetchMessages()
+
+        // Filtering the new messages which are already not seen
+        if (newMessages.nonEmpty) {
+          newMessages.filter(message => sqsFileCache.isNewFile(message._1, message._2))
+            .foreach(message =>
+              sqsFileCache.add(message._1, MessageDescription(message._2, false, message._3)))
+        }
+      } catch {
+        case e: Exception =>
+          exception = Some(e)
+      }
+    }
+  }
+
+  sqsScheduler.scheduleWithFixedDelay(
+    sqsFetchMessagesThread,
+    0,
+    sqsFetchIntervalSeconds,
+    TimeUnit.SECONDS)
+
+  private def sqsFetchMessages(): Seq[(String, Long, String)] = {
+    val messageList = try {
+      val receiveMessageRequest = new ReceiveMessageRequest()
+        .withQueueUrl(sqsUrl)
+        .withWaitTimeSeconds(sqsLongPollWaitTimeSeconds)
+      val messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages.asScala
+      retriesOnFailure = 0
+      logDebug(s"successfully received ${messages.size} messages")
+      messages
+    } catch {
+      case ase: AmazonServiceException =>
+        val message =
+        """
+          |Caught an AmazonServiceException, which means your request made it to Amazon SQS,
+          | rejected with an error response for some reason.
+        """.stripMargin
+        logWarning(message)
+        logWarning(s"Error Message: ${ase.getMessage}")
+        logWarning(s"HTTP Status Code: ${ase.getStatusCode}, AWS Error Code: ${ase.getErrorCode}")
+        logWarning(s"Error Type: ${ase.getErrorType}, Request ID: ${ase.getRequestId}")
+        evaluateRetries()
+        List.empty
+      case ace: AmazonClientException =>
+        val message =
+        """
+           |Caught an AmazonClientException, which means, the client encountered a serious
+           | internal problem while trying to communicate with Amazon SQS, such as not
+           |  being able to access the network.
+        """.stripMargin
+        logWarning(message)
+        logWarning(s"Error Message: ${ace.getMessage()}")
+        evaluateRetries()
+        List.empty
+      case e: Exception =>
+        val message = "Received unexpected error from SQS"
+        logWarning(message)
+        logWarning(s"Error Message: ${e.getMessage()}")
+        evaluateRetries()
+        List.empty
+    }
+    if (messageList.nonEmpty) {
+      parseSqsMessages(messageList)
+    } else {
+      Seq.empty
+    }
+  }
+
+  private def parseSqsMessages(messageList: Seq[Message]): Seq[(String, Long, String)] = {
+    val errorMessages = scala.collection.mutable.ListBuffer[String]()
+    val parsedMessages = messageList.foldLeft(Seq[(String, Long, String)]()) { (list, message) =>
+      implicit val formats = DefaultFormats
+      try {
+        val messageReceiptHandle = message.getReceiptHandle
+        val messageJson = parse(message.getBody).extract[JValue]
+        val bucketName = (
+          messageJson \ "Records" \ "s3" \ "bucket" \ "name").extract[Array[String]].head
+        val eventName = (messageJson \ "Records" \ "eventName").extract[Array[String]].head
+        if (eventName.contains("ObjectCreated")) {
+          val timestamp = (messageJson \ "Records" \ "eventTime").extract[Array[String]].head
+          val timestampMills = convertTimestampToMills(timestamp)
+          val path = "s3://" +
+            bucketName + "/" +
+            (messageJson \ "Records" \ "s3" \ "object" \ "key").extract[Array[String]].head
+          logDebug("Successfully parsed sqs message")
+          list :+ ((path, timestampMills, messageReceiptHandle))
+        } else {
+          if (eventName.contains("ObjectRemoved")) {
+            if (!ignoreFileDeletion) {
+              exception = Some(new SparkException("ObjectDelete message detected in SQS"))
+            } else {
+              logInfo("Ignoring file deletion message since ignoreFileDeletion is true")
+            }
+          } else {
+            logWarning("Ignoring unexpected message detected in SQS")
+          }
+          errorMessages.append(messageReceiptHandle)
+          list
+        }
+      } catch {
+        case me: MappingException =>
+          errorMessages.append(message.getReceiptHandle)
+          logWarning(s"Error in parsing SQS message ${me.getMessage}")
+          list
+        case e: Exception =>
+          errorMessages.append(message.getReceiptHandle)
+          logWarning(s"Unexpected error while parsing SQS message ${e.getMessage}")
+          list
+      }
+    }
+    if (errorMessages.nonEmpty) {
+      addToDeleteMessageQueue(errorMessages.toList)
+    }
+    parsedMessages
+  }
+
+  private def convertTimestampToMills(timestamp: String): Long = {
+    val timeInMillis = timestampFormat.parse(timestamp).getTime()
+    timeInMillis
+  }
+
+  private def evaluateRetries(): Unit = {
+    retriesOnFailure += 1
+    if (retriesOnFailure >= sqsMaxRetries) {
+      logError("Max retries reached")
+      exception = Some(new SparkException("Unable to receive Messages from SQS for " +
+        s"${sqsMaxRetries} times Giving up. Check logs for details."))
+    } else {
+      logWarning(s"Attempt ${retriesOnFailure}." +
+        s"Will reattempt after ${sqsFetchIntervalSeconds} seconds")
+    }
+  }
+
+  private def createSqsClient(): AmazonSQS = {
+    try {
+      val isClusterOnEc2Role = hadoopConf.getBoolean(
+        "fs.s3.isClusterOnEc2Role", false) || hadoopConf.getBoolean(
+        "fs.s3n.isClusterOnEc2Role", false) || sourceOptions.useInstanceProfileCredentials
+      if (!isClusterOnEc2Role) {
+        val accessKey = hadoopConf.getTrimmed("fs.s3n.awsAccessKeyId")
+        val secretAccessKey = new String(hadoopConf.getPassword("fs.s3n.awsSecretAccessKey")).trim
+        logInfo("Using credentials from keys provided")
+        val basicAwsCredentialsProvider = new BasicAWSCredentialsProvider(
+          accessKey, secretAccessKey)
+        AmazonSQSClientBuilder
+          .standard()
+          .withClientConfiguration(new ClientConfiguration().withMaxConnections(maxConnections))
+          .withCredentials(basicAwsCredentialsProvider)
+          .withRegion(region)
+          .build()
+      } else {
+        logInfo("Using the credentials attached to the instance")
+        val instanceProfileCredentialsProvider = new InstanceProfileCredentialsProviderWithRetries()
+        AmazonSQSClientBuilder
+          .standard()
+          .withClientConfiguration(new ClientConfiguration().withMaxConnections(maxConnections))
+          .withCredentials(instanceProfileCredentialsProvider)
+          .build()
+      }
+    } catch {
+      case e: Exception =>
+        throw new SparkException(s"Error occured while creating Amazon SQS Client ${e.getMessage}")
+    }
+  }
+
+  def addToDeleteMessageQueue(messageReceiptHandles: List[String]): Unit = {
+    deleteMessageQueue.addAll(messageReceiptHandles.asJava)
+  }
+
+  def deleteMessagesFromQueue(): Unit = {
+    try {
+      var count = -1
+      val messageReceiptHandles = deleteMessageQueue.asScala.toList
+      val messageGroups = messageReceiptHandles.sliding(10, 10).toList
+      messageGroups.foreach { messageGroup =>
+        val requestEntries = messageGroup.foldLeft(List[DeleteMessageBatchRequestEntry]()) {
+          (list, messageReceiptHandle) =>
+            count = count + 1
+            list :+ new DeleteMessageBatchRequestEntry(count.toString, messageReceiptHandle)
+        }.asJava
+        val batchResult = sqsClient.deleteMessageBatch(sqsUrl, requestEntries)
+        if (!batchResult.getFailed.isEmpty) {
+          batchResult.getFailed.asScala.foreach { entry =>
+            sqsClient.deleteMessage(
+              sqsUrl, requestEntries.get(entry.getId.toInt).getReceiptHandle)
+          }
+        }
+      }
+    } catch {
+      case e: Exception =>
+        logWarning(s"Unable to delete message from SQS ${e.getMessage}")
+    }
+    deleteMessageQueue.clear()
+  }
+
+  def assertSqsIsWorking(): Unit = {
+    if (exception.isDefined) {
+      throw exception.get
+    }
+  }
+
+}
diff --git a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsFileCache.scala b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsFileCache.scala
new file mode 100644
index 0000000..ff42448
--- /dev/null
+++ b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsFileCache.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.sqs
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.internal.Logging
+
+  /**
+   * A custom hash map used to track the list of files seen. This map is thread-safe.
+   * To prevent the hash map from growing indefinitely, a purge function is available to
+   * remove files "maxAgeMs" older than the latest file.
+   */
+
+class SqsFileCache(maxAgeMs: Long, fileNameOnly: Boolean) extends Logging {
+  require(maxAgeMs >= 0)
+  if (fileNameOnly) {
+    logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " +
+      "UUID), otherwise, files with the same name but under different paths will be considered " +
+      "the same and causes data lost.")
+  }
+
+  /** Mapping from file path to its message description. */
+  private val sqsMap = new ConcurrentHashMap[String, MessageDescription]
+
+  /** Timestamp for the last purge operation. */
+  private var lastPurgeTimestamp: Long = 0L
+
+  /** Timestamp of the latest file. */
+  private var latestTimestamp: Long = 0L
+
+  @inline private def stripPathIfNecessary(path: String) = {
+    if (fileNameOnly) new Path(new URI(path)).getName else path
+  }
+
+   /**
+    * Returns true if we should consider this file a new file. The file is only considered "new"
+    * if it is new enough that we are still tracking, and we have not seen it before.
+    */
+  def isNewFile(path: String, timestamp: Long): Boolean = {
+    timestamp >= lastPurgeTimestamp && !sqsMap.containsKey(stripPathIfNecessary(path))
+  }
+
+  /** Add a new file to the map. */
+  def add(path: String, fileStatus: MessageDescription): Unit = {
+    sqsMap.put(stripPathIfNecessary(path), fileStatus)
+    if (fileStatus.timestamp > latestTimestamp) {
+      latestTimestamp = fileStatus.timestamp
+    }
+  }
+
+   /**
+    * Returns all the new files found - ignore aged files and files that we have already seen.
+    * Sorts the files by timestamp.
+    */
+  def getUncommittedFiles(maxFilesPerTrigger: Option[Int],
+                             shouldSortFiles: Boolean): Seq[(String, Long, String)] = {
+    if (shouldSortFiles) {
+      val uncommittedFiles = filterAllUncommittedFiles()
+      val sortedFiles = reportTimeTaken("Sorting Files") {
+         uncommittedFiles.sortWith(_._2 < _._2)
+      }
+      if (maxFilesPerTrigger.nonEmpty) sortedFiles.take(maxFilesPerTrigger.get) else sortedFiles
+    } else {
+      if (maxFilesPerTrigger.isEmpty) {
+        filterAllUncommittedFiles()
+      } else {
+        filterTopUncommittedFiles(maxFilesPerTrigger.get)
+      }
+    }
+  }
+    private def filterTopUncommittedFiles(maxFilesPerTrigger: Int): List[(String, Long, String)] = {
+      val iterator = sqsMap.asScala.iterator
+      val uncommittedFiles = ListBuffer[(String, Long, String)]()
+      while (uncommittedFiles.length < maxFilesPerTrigger && iterator.hasNext) {
+        val file = iterator.next()
+        if (file._2.isCommitted && file._2.timestamp >= lastPurgeTimestamp) {
+          uncommittedFiles += ((file._1, file._2.timestamp, file._2.messageReceiptHandle))
+        }
+      }
+      uncommittedFiles.toList
+    }
+
+    private def reportTimeTaken[T](operation: String)(body: => T): T = {
+      val startTime = System.currentTimeMillis()
+      val result = body
+      val endTime = System.currentTimeMillis()
+      val timeTaken = math.max(endTime - startTime, 0)
+
+      logDebug(s"$operation took $timeTaken ms")
+      result
+    }
+
+    private def filterAllUncommittedFiles(): List[(String, Long, String)] = {
+      sqsMap.asScala.foldLeft(List[(String, Long, String)]()) {
+        (list, file) =>
+          if (!file._2.isCommitted && file._2.timestamp >= lastPurgeTimestamp) {
+            list :+ ((file._1, file._2.timestamp, file._2.messageReceiptHandle))
+          } else {
+            list
+          }
+      }
+    }
+
+  /** Removes aged entries and returns the number of files removed. */
+  def purge(): Int = {
+    lastPurgeTimestamp = latestTimestamp - maxAgeMs
+    var count = 0
+    sqsMap.asScala.foreach { fileEntry =>
+      if (fileEntry._2.timestamp < lastPurgeTimestamp) {
+        sqsMap.remove(fileEntry._1)
+        count += 1
+      }
+    }
+    count
+  }
+
+  /** Mark file entry as committed or already processed */
+  def markCommitted(path: String): Unit = {
+    sqsMap.replace(path, MessageDescription(
+      sqsMap.get(path).timestamp, true, sqsMap.get(path).messageReceiptHandle))
+  }
+
+  def size: Int = sqsMap.size()
+
+}
+
+   /**
+    * A case class to store file metadata. Metadata includes file timestamp, file status -
+    * committed or not committed and message reciept handle used for deleting message from
+    * Amazon SQS
+    */
+case class MessageDescription(timestamp: Long,
+                              isCommitted: Boolean = false,
+                              messageReceiptHandle: String)
diff --git a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSource.scala b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSource.scala
new file mode 100644
index 0000000..4f301cc
--- /dev/null
+++ b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSource.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.sqs
+
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.FileStreamSource._
+import org.apache.spark.sql.types.StructType
+
+
+class SqsSource(sparkSession: SparkSession,
+                metadataPath: String,
+                options: Map[String, String],
+                override val schema: StructType) extends Source with Logging {
+
+  private val sourceOptions = new SqsSourceOptions(options)
+
+  private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+
+  private val metadataLog =
+    new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
+  private var metadataLogCurrentOffset = metadataLog.getLatest().map(_._1).getOrElse(-1L)
+
+  private val maxFilesPerTrigger = sourceOptions.maxFilesPerTrigger
+
+  private val maxFileAgeMs: Long = sourceOptions.maxFileAgeMs
+
+  private val fileFormatClassName = sourceOptions.fileFormatClassName
+
+  private val shouldSortFiles = sourceOptions.shouldSortFiles
+
+  private val sqsClient = new SqsClient(sourceOptions, hadoopConf)
+
+  metadataLog.allFiles().foreach { entry =>
+    sqsClient.sqsFileCache.add(entry.path, MessageDescription(entry.timestamp, true, ""))
+  }
+  sqsClient.sqsFileCache.purge()
+
+  logInfo(s"maxFilesPerBatch = $maxFilesPerTrigger, maxFileAgeMs = $maxFileAgeMs")
+
+   /**
+    * Returns the data that is between the offsets (`start`, `end`].
+    */
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    val startOffset = start.map(FileStreamSourceOffset(_).logOffset).getOrElse(-1L)
+    val endOffset = FileStreamSourceOffset(end).logOffset
+
+    assert(startOffset <= endOffset)
+    val files = metadataLog.get(Some(startOffset + 1), Some(endOffset)).flatMap(_._2)
+    logInfo(s"Processing ${files.length} files from ${startOffset + 1}:$endOffset")
+    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
+    val newDataSource =
+      DataSource(
+        sparkSession,
+        paths = files.map(f => new Path(new URI(f.path)).toString),
+        userSpecifiedSchema = Some(schema),
+        className = fileFormatClassName,
+        options = options)
+    Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
+      checkFilesExist = false), isStreaming = true))
+  }
+
+  private def fetchMaxOffset(): FileStreamSourceOffset = synchronized {
+
+    sqsClient.assertSqsIsWorking()
+    /**
+     * All the new files found - ignore aged files and files that we have seen.
+     *  Obey user's setting to limit the number of files in this batch trigger.
+     */
+    val batchFiles = sqsClient.sqsFileCache.getUncommittedFiles(maxFilesPerTrigger, shouldSortFiles)
+
+    if (batchFiles.nonEmpty) {
+      metadataLogCurrentOffset += 1
+      metadataLog.add(metadataLogCurrentOffset, batchFiles.map {
+        case (path, timestamp, receiptHandle) =>
+          FileEntry(path = path, timestamp = timestamp, batchId = metadataLogCurrentOffset)
+      }.toArray)
+      logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files")
+      val messageReceiptHandles = batchFiles.map {
+        case (path, timestamp, receiptHandle) =>
+          sqsClient.sqsFileCache.markCommitted(path)
+          logDebug(s"New file: $path")
+          receiptHandle
+      }.toList
+      sqsClient.addToDeleteMessageQueue(messageReceiptHandles)
+    }
+
+    val numPurged = sqsClient.sqsFileCache.purge()
+
+    if (!sqsClient.deleteMessageQueue.isEmpty) {
+      sqsClient.deleteMessagesFromQueue()
+    }
+
+    logTrace(
+      s"""
+         |Number of files selected for batch = ${batchFiles.size}
+         |Number of files purged from tracking map = $numPurged
+       """.stripMargin)
+
+    FileStreamSourceOffset(metadataLogCurrentOffset)
+  }
+
+  override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.logOffset == -1)
+
+  override def commit(end: Offset): Unit = {
+    // No-op for now; SqsSource currently garbage-collects files based on timestamp
+    // and the value of the maxFileAge parameter.
+  }
+
+  override def stop(): Unit = {
+    if (!sqsClient.sqsScheduler.isTerminated) {
+      sqsClient.sqsScheduler.shutdownNow()
+    }
+  }
+
+  override def toString: String = s"SqsSource[${sqsClient.sqsUrl}]"
+
+}
+
diff --git a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
new file mode 100644
index 0000000..a4c0cc1
--- /dev/null
+++ b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.sqs
+
+import scala.util.Try
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.util.Utils
+
+/**
+ * User specified options for sqs source.
+ */
+class SqsSourceOptions(parameters: CaseInsensitiveMap[String]) extends Logging {
+
+  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+
+  val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
+    Try(str.toInt).toOption.filter(_ > 0).getOrElse {
+      throw new IllegalArgumentException(
+        s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
+    }
+  }
+
+  /**
+   * Maximum age of a file that can be found in this directory, before it is ignored. For the
+   * first batch all files will be considered valid.
+   *
+   * The max age is specified with respect to the timestamp of the latest file, and not the
+   * timestamp of the current system. That this means if the last file has timestamp 1000, and the
+   * current system time is 2000, and max age is 200, the system will purge files older than
+   * 800 (rather than 1800) from the internal state.
+   *
+   * Default to a week.
+   */
+  val maxFileAgeMs: Long =
+    Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "7d"))
+
+  val fetchIntervalSeconds: Int = parameters.get("sqsFetchIntervalSeconds").map { str =>
+    Try(str.toInt).toOption.filter(_ > 0).getOrElse {
+      throw new IllegalArgumentException(
+        s"Invalid value '$str' for option 'sqsFetchIntervalSeconds', must be a positive integer")
+    }
+  }.getOrElse(10)
+
+  val longPollWaitTimeSeconds: Int = parameters.get("sqsLongPollingWaitTimeSeconds").map { str =>
+    Try(str.toInt).toOption.filter(x => x >= 0 && x <= 20).getOrElse {
+      throw new IllegalArgumentException(
+        s"Invalid value '$str' for option 'sqsLongPollingWaitTimeSeconds'," +
+          "must be an integer between 0 and 20")
+    }
+  }.getOrElse(20)
+
+  val maxRetries: Int = parameters.get("sqsMaxRetries").map { str =>
+    Try(str.toInt).toOption.filter(_ > 0).getOrElse {
+      throw new IllegalArgumentException(
+        s"Invalid value '$str' for option 'sqsMaxRetries', must be a positive integer")
+    }
+  }.getOrElse(10)
+
+  val maxConnections: Int = parameters.get("sqsMaxConnections").map { str =>
+    Try(str.toInt).toOption.filter(_ > 0).getOrElse {
+      throw new IllegalArgumentException(
+        s"Invalid value '$str' for option 'sqsMaxConnections', must be a positive integer")
+    }
+  }.getOrElse(1)
+
+  val sqsUrl: String = parameters.get("sqsUrl").getOrElse{
+    throw new IllegalArgumentException("SQS Url is not specified")
+  }
+
+  val region: String = parameters.get("region").getOrElse {
+    throw new IllegalArgumentException("Region is not specified")
+  }
+
+  val fileFormatClassName: String = parameters.get("fileFormat").getOrElse {
+    throw new IllegalArgumentException("Specifying file format is mandatory with sqs source")
+  }
+
+  val ignoreFileDeletion: Boolean = withBooleanParameter("ignoreFileDeletion", false)
+
+   /**
+    * Whether to check new files based on only the filename instead of on the full path.
+    *
+    * With this set to `true`, the following files would be considered as the same file, because
+    * their filenames, "dataset.txt", are the same:
+    * - "file:///dataset.txt"
+    * - "s3://a/dataset.txt"
+    * - "s3n://a/b/dataset.txt"
+    * - "s3a://a/b/c/dataset.txt"
+    */
+  val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
+
+  val shouldSortFiles: Boolean = withBooleanParameter("shouldSortFiles", true)
+
+  val useInstanceProfileCredentials: Boolean = withBooleanParameter(
+    "useInstanceProfileCredentials", false)
+
+  private def withBooleanParameter(name: String, default: Boolean) = {
+    parameters.get(name).map { str =>
+      try {
+        str.toBoolean
+      } catch {
+        case _: IllegalArgumentException =>
+          throw new IllegalArgumentException(
+            s"Invalid value '$str' for option '$name', must be true or false")
+      }
+    }.getOrElse(default)
+  }
+
+}
diff --git a/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceProvider.scala b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceProvider.scala
new file mode 100644
index 0000000..ce2b9e0
--- /dev/null
+++ b/sql-streaming-sqs/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceProvider.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.sqs
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.streaming.Source
+import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
+import org.apache.spark.sql.types.StructType
+
+class SqsSourceProvider extends DataSourceRegister
+  with StreamSourceProvider
+  with Logging {
+
+  override def shortName(): String = "s3-sqs"
+
+  override def sourceSchema(sqlContext: SQLContext,
+                            schema: Option[StructType],
+                            providerName: String,
+                            parameters: Map[String, String]): (String, StructType) = {
+
+    require(schema.isDefined, "Sqs source doesn't support empty schema")
+    (shortName(), schema.get)
+  }
+
+  override def createSource(sqlContext: SQLContext,
+                            metadataPath: String,
+                            schema: Option[StructType],
+                            providerName: String,
+                            parameters: Map[String, String]): Source = {
+
+    new SqsSource(
+      sqlContext.sparkSession,
+      metadataPath,
+      parameters,
+      schema.get)
+  }
+}
diff --git a/sql-streaming-sqs/src/test/resources/log4j.properties b/sql-streaming-sqs/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a9166df
--- /dev/null
+++ b/sql-streaming-sqs/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.sparkproject.jetty=WARN
diff --git a/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala b/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
new file mode 100644
index 0000000..6382fb1
--- /dev/null
+++ b/sql-streaming-sqs/src/test/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptionsSuite.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming.sqs
+
+import java.util.Locale
+
+import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest}
+import org.apache.spark.sql.types.StructType
+
+class SqsSourceOptionsSuite extends StreamTest {
+
+  test("bad source options") {
+    def testBadOptions(option: (String, String))(expectedMsg: String): Unit = {
+
+      var query : StreamingQuery = null
+
+      try {
+        val errorMessage = intercept[StreamingQueryException] {
+          val dummySchema = new StructType
+          val reader = spark
+            .readStream
+            .format("s3-sqs")
+            .option("fileFormat", "json")
+            .schema(dummySchema)
+            .option("sqsUrl", "https://DUMMY_URL")
+            .option("region", "us-east-1")
+            .option(option._1, option._2)
+            .load()
+
+          query = reader.writeStream
+            .format("memory")
+            .queryName("badOptionsTest")
+            .start()
+
+          query.processAllAvailable()
+        }.getMessage
+        assert(errorMessage.toLowerCase(Locale.ROOT).contains(expectedMsg.toLowerCase(Locale.ROOT)))
+      } finally {
+        if (query != null) {
+          // terminating streaming query if necessary
+          query.stop()
+        }
+
+      }
+    }
+
+    testBadOptions("sqsFetchIntervalSeconds" -> "-2")("Invalid value '-2' " +
+      "for option 'sqsFetchIntervalSeconds', must be a positive integer")
+    testBadOptions("sqsLongPollingWaitTimeSeconds" -> "-5")("Invalid value '-5' " +
+      "for option 'sqsLongPollingWaitTimeSeconds',must be an integer between 0 and 20")
+    testBadOptions("sqsMaxConnections" -> "-2")("Invalid value '-2' " +
+      "for option 'sqsMaxConnections', must be a positive integer")
+    testBadOptions("maxFilesPerTrigger" -> "-50")("Invalid value '-50' " +
+      "for option 'maxFilesPerTrigger', must be a positive integer")
+    testBadOptions("ignoreFileDeletion" -> "x")("Invalid value 'x' " +
+      "for option 'ignoreFileDeletion', must be true or false")
+    testBadOptions("fileNameOnly" -> "x")("Invalid value 'x' " +
+      "for option 'fileNameOnly', must be true or false")
+    testBadOptions("shouldSortFiles" -> "x")("Invalid value 'x' " +
+      "for option 'shouldSortFiles', must be true or false")
+    testBadOptions("useInstanceProfileCredentials" -> "x")("Invalid value 'x' " +
+      "for option 'useInstanceProfileCredentials', must be true or false")
+
+  }
+
+  test("missing mandatory options") {
+
+    def testMissingMandatoryOptions(options: List[(String, String)])(expectedMsg: String): Unit = {
+
+      var query: StreamingQuery = null
+
+      try {
+        val errorMessage = intercept[StreamingQueryException] {
+          val dummySchema = new StructType
+          val reader = spark
+            .readStream
+            .format("s3-sqs")
+            .schema(dummySchema)
+
+          val readerWithOptions = options.map { option =>
+           reader.option(option._1, option._2)
+          }.last.load()
+
+          query = readerWithOptions.writeStream
+            .format("memory")
+            .queryName("missingMandatoryOptions")
+            .start()
+
+          query.processAllAvailable()
+        }.getMessage
+        assert(errorMessage.toLowerCase(Locale.ROOT).contains(expectedMsg.toLowerCase(Locale.ROOT)))
+      } finally {
+        if (query != null) {
+          // terminating streaming query if necessary
+          query.stop()
+        }
+      }
+    }
+
+    // No fileFormat specified
+    testMissingMandatoryOptions(List("sqsUrl" -> "https://DUMMY_URL", "region" -> "us-east-1"))(
+      "Specifying file format is mandatory with sqs source")
+
+    // Sqs URL not specified
+    testMissingMandatoryOptions(List("fileFormat" -> "json", "region" -> "us-east-1"))(
+      "SQS Url is not specified")
+  }
+
+  test("schema not specified") {
+
+    var query: StreamingQuery = null
+
+    val expectedMsg = "Sqs source doesn't support empty schema"
+
+    try {
+      val errorMessage = intercept[IllegalArgumentException] {
+        val reader = spark
+          .readStream
+          .format("s3-sqs")
+          .option("sqsUrl", "https://DUMMY_URL")
+          .option("fileFormat", "json")
+          .option("region", "us-east-1")
+          .load()
+
+        query = reader.writeStream
+          .format("memory")
+          .queryName("missingSchema")
+          .start()
+
+        query.processAllAvailable()
+      }.getMessage
+      assert(errorMessage.toLowerCase(Locale.ROOT).contains(expectedMsg.toLowerCase(Locale.ROOT)))
+    } finally {
+      if (query != null) {
+        // terminating streaming query if necessary
+        query.stop()
+      }
+    }
+
+  }
+
+}
+