A library for reading data from Amzon S3 with optimised listing using Amazon SQS using Spark SQL Streaming ( or Structured streaming.).
Using SBT:
libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-sqs" % "{{site.SPARK_VERSION}}"
Using Maven:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-sql-streaming-sqs_{{site.SCALA_BINARY_VERSION}}</artifactId>
<version>{{site.SPARK_VERSION}}</version>
</dependency>
This library can also be added to Spark jobs launched through spark-shell or spark-submit by using the --packages command line option. For example, to include it when starting the spark shell:
$ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-sqs_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}}
Unlike using --jars, using --packages ensures that this library and its dependencies will be added to the classpath. The --packages argument can also be used with bin/spark-submit.
This library is compiled for Scala 2.12 only, and intends to support Spark 2.4.0 onwards.
The configuration is obtained from parameters.
| Name | Default | Meaning |
|---|---|---|
| sqsUrl | required, no default value | sqs queue url, like ‘https://sqs.us-east-1.amazonaws.com/330183209093/TestQueue’ |
| region | required, no default value | AWS region where queue is created |
| fileFormat | required, no default value | file format for the s3 files stored on Amazon S3 |
| schema | required, no default value | schema of the data being read |
| sqsFetchIntervalSeconds | 10 | time interval (in seconds) after which to fetch messages from Amazon SQS queue |
| sqsLongPollingWaitTimeSeconds | 20 | wait time (in seconds) for long polling on Amazon SQS queue |
| sqsMaxConnections | 1 | number of parallel threads to connect to Amazon SQS queue |
| sqsMaxRetries | 10 | Maximum number of consecutive retries in case of a connection failure to SQS before giving up |
| ignoreFileDeletion | false | whether to ignore any File deleted message in SQS queue |
| fileNameOnly | false | Whether to check new files based on only the filename instead of on the full path |
| shouldSortFiles | true | whether to sort files based on timestamp while listing them from SQS |
| useInstanceProfileCredentials | false | Whether to use EC2 instance profile credentials for connecting to Amazon SQS |
| maxFilesPerTrigger | no default value | maximum number of files to process in a microbatch |
| maxFileAge | 7d | Maximum age of a file that can be found in this directory |
| messageWrapper | None | - ‘None’ if SQS contains plain S3 message. - ‘SNS’ if SQS contains S3 notification message which came from SNS. Please see ‘Use multiple consumers’ section for more details |
SQS cannot be read by multiple consumers.
If S3 path should be listen by multiple applications the following approach is recommended: S3 -> SNS -> SQS:
Thus, one S3 path can be processed by multiple applications.
An example to create a SQL stream which uses Amazon SQS to list files on S3,
val inputDf = sparkSession
.readStream
.format("s3-sqs")
.schema(schema)
.option("sqsUrl", queueUrl)
.option("fileFormat", "json")
.option("sqsFetchIntervalSeconds", "2")
.option("sqsLongPollingWaitTimeSeconds", "5")
.option("useInstanceProfileCredentials", "true")
.load()