blob: b0180b02d38b3b5a4319ba281b5ac0b9288f676e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.scaladsl
import org.apache.pekko
import pekko.http.scaladsl.model.headers.ByteRange
import pekko.http.scaladsl.model.{ ContentType, ContentTypes, HttpEntity, HttpResponse, IllegalUriException }
import pekko.stream.Attributes
import pekko.stream.connectors.s3.BucketAccess.{ AccessDenied, AccessGranted, NotExists }
import pekko.stream.connectors.s3._
import pekko.stream.connectors.s3.headers.ServerSideEncryption
import pekko.stream.connectors.s3.scaladsl.{ S3, S3ClientIntegrationSpec, S3WireMockBase }
import pekko.stream.scaladsl.{ Keep, Sink, Source }
import pekko.util.ByteString
import pekko.{ Done, NotUsed }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.providers._
import scala.annotation.nowarn
import scala.concurrent.Future
class S3SourceSpec extends S3WireMockBase with S3ClientIntegrationSpec {
private val sampleSettings = S3Ext(system).settings
override protected def afterEach(): Unit =
mock.removeMappings()
"S3Source" should "download a stream of bytes from S3" in {
mockDownload()
// #download
val s3Source: Source[ByteString, Future[ObjectMetadata]] =
S3.getObject(bucket, bucketKey)
val (metadataFuture, dataFuture) =
s3Source.toMat(Sink.head)(Keep.both).run()
// #download
val data = dataFuture.futureValue
val metadata = metadataFuture.futureValue
data.utf8String shouldBe body
// #downloadToPekkoHttp
HttpResponse(
entity = HttpEntity(
metadata.contentType
.flatMap(ContentType.parse(_).toOption)
.getOrElse(ContentTypes.`application/octet-stream`),
metadata.contentLength,
s3Source))
// #downloadToPekkoHttp
}
"S3Source" should "use custom settings when downloading a file" in {
val region = Region.AP_NORTHEAST_1
mockDownload(region)
val customRegion = S3Ext(system).settings
.withS3RegionProvider(new AwsRegionProvider {
override def getRegion: Region = region
})
val data = S3
.getObject(bucket, bucketKey)
.withAttributes(S3Attributes.settings(customRegion))
data.map(_.utf8String).runWith(Sink.head).futureValue shouldBe body
}
"S3Source" should "download a metadata from S3" in {
val contentLength = 8
mockHead(contentLength)
// #objectMetadata
val metadata: Source[Option[ObjectMetadata], NotUsed] =
S3.getObjectMetadata(bucket, bucketKey)
// #objectMetadata
val Some(result) = metadata.runWith(Sink.head).futureValue: @nowarn("msg=match may not be exhaustive")
result.eTag shouldBe Some(etag)
result.contentLength shouldBe contentLength
result.versionId shouldBe empty
}
"S3Source" should "download a metadata from S3 for a big file" in {
val contentLength = Long.MaxValue
mockHead(contentLength)
val metadata = S3.getObjectMetadata(bucket, bucketKey)
val Some(result) = metadata.runWith(Sink.head).futureValue: @nowarn("msg=match may not be exhaustive")
result.eTag shouldBe Some(etag)
result.contentLength shouldBe contentLength
result.versionId shouldBe empty
}
"S3Source" should "download a metadata from S3 for specific version" in {
val versionId = "3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo"
mockHeadWithVersion(versionId)
val metadata = S3.getObjectMetadata(bucket, bucketKey, Some(versionId))
val Some(result) = metadata.runWith(Sink.head).futureValue: @nowarn("msg=match may not be exhaustive")
result.eTag shouldBe Some(etag)
result.contentLength shouldBe 8
result.versionId.fold(fail("unable to get versionId from S3")) { vId =>
vId shouldEqual versionId
}
}
it should "download a metadata from S3 using server side encryption" in {
mockHeadSSEC()
val metadata = S3.getObjectMetadata(bucket, bucketKey, sse = Some(sseCustomerKeys))
val Some(result) = metadata.runWith(Sink.head).futureValue: @nowarn("msg=match may not be exhaustive")
result.eTag shouldBe Some(etagSSE)
result.contentLength shouldBe 8
}
it should "download a range of file's bytes from S3 if bytes range given" in {
mockRangedDownload()
// #rangedDownload
val s3Source = S3.getObject(bucket, bucketKey, Some(ByteRange(bytesRangeStart, bytesRangeEnd)))
// #rangedDownload
val result: Future[Array[Byte]] = s3Source.map(_.toArray).runWith(Sink.head)
result.futureValue shouldBe rangeOfBody
}
it should "download a stream of bytes using customer server side encryption" in {
mockDownloadSSEC()
val s3Source = S3.getObject(bucket, bucketKey, sse = Some(sseCustomerKeys))
val result = s3Source.map(_.utf8String).runWith(Sink.head)
result.futureValue shouldBe bodySSE
}
it should "download a stream of bytes using customer server side encryption with version" in {
val versionId = "3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo"
mockDownloadSSECWithVersion(versionId)
val s3Source =
S3.getObject(bucket, bucketKey, versionId = Some(versionId), sse = Some(sseCustomerKeys))
val (metadata, result) = s3Source.map(_.utf8String).toMat(Sink.head)(Keep.both).run()
result.futureValue shouldBe bodySSE
metadata.futureValue.versionId.fold(fail("unable to get versionId from S3")) { vId =>
vId shouldEqual versionId
}
}
it should "throw the correct S3Exception if a request returns 404" in {
mock404s()
val download = S3
.getObject("nonexisting-bucket", "nonexisting_file.xml")
.runWith(Sink.head)
download.failed.futureValue should matchPattern {
case s3Exception: S3Exception if s3Exception.code == "NoSuchKey" =>
}
}
it should "fail for illegal bucket names" in {
val dnsStyleAccess = S3Ext(system).settings
.withAccessStyle(AccessStyle.PathAccessStyle)
.withEndpointUrl(null)
val download = S3
.getObject("path/../with-dots", "unused")
.withAttributes(S3Attributes.settings(dnsStyleAccess))
.runWith(Sink.head)
download.failed.futureValue shouldBe an[IllegalUriException]
}
it should "fail if download using server side encryption returns 'Invalid Request'" in {
mockSSEInvalidRequest()
val sse = ServerSideEncryption.customerKeys("encoded-key").withMd5("md5-encoded-key")
val result = S3
.getObject(bucket, bucketKey, sse = Some(sse))
.map(_.decodeString("utf8"))
.runWith(Sink.head)
whenReady(result.failed) { e =>
e shouldBe a[S3Exception]
e.asInstanceOf[S3Exception].code should equal("InvalidRequest")
}
}
it should "list keys for a given bucket with a prefix" in {
mockListBucket()
// #list-bucket
val keySource: Source[ListBucketResultContents, NotUsed] =
S3.listBucket(bucket, Some(listPrefix))
// #list-bucket
val result = keySource.runWith(Sink.head)
result.futureValue.key shouldBe listKey
}
it should "list keys for a given bucket with a prefix using the version 1 api" in {
mockListBucketVersion1()
// #list-bucket-attributes
val useVersion1Api = S3Ext(system).settings
.withListBucketApiVersion(ApiVersion.ListBucketVersion1)
val keySource: Source[ListBucketResultContents, NotUsed] =
S3.listBucket(bucket, Some(listPrefix))
.withAttributes(S3Attributes.settings(useVersion1Api))
// #list-bucket-attributes
val result = keySource.runWith(Sink.head)
result.futureValue.key shouldBe listKey
}
it should "list keys and common prefixes for a given bucket with a prefix and delimiter" in {
mockListBucketAndCommonPrefixes()
// #list-bucket-and-common-prefixes
val keyAndCommonPrefixSource
: Source[(Seq[ListBucketResultContents], Seq[ListBucketResultCommonPrefixes]), NotUsed] =
S3.listBucketAndCommonPrefixes(bucket, listDelimiter, Some(listPrefix))
// #list-bucket-and-common-prefixes
val result = keyAndCommonPrefixSource.runWith(Sink.head)
val contents = result.futureValue._1
val commonPrefixes = result.futureValue._2
contents.head.key shouldBe listKey
commonPrefixes.head.prefix shouldBe listCommonPrefix
}
it should "list keys and common prefixes for a given bucket with a prefix and delimiter using the version 1 api" in {
mockListBucketAndCommonPrefixesVersion1()
val useVersion1Api = S3Ext(system).settings
.withListBucketApiVersion(ApiVersion.ListBucketVersion1)
val keyAndCommonPrefixSource
: Source[(Seq[ListBucketResultContents], Seq[ListBucketResultCommonPrefixes]), NotUsed] =
S3.listBucketAndCommonPrefixes(bucket, listDelimiter, Some(listPrefix))
.withAttributes(S3Attributes.settings(useVersion1Api))
val result = keyAndCommonPrefixSource.runWith(Sink.head)
val contents = result.futureValue._1
val commonPrefixes = result.futureValue._2
contents.head.key shouldBe listKey
commonPrefixes.head.prefix shouldBe listCommonPrefix
}
it should "list keys for a given bucket with a delimiter and prefix" in {
mockListBucketAndCommonPrefixes()
// #list-bucket-delimiter
val keySource: Source[ListBucketResultContents, NotUsed] =
S3.listBucket(bucket, listDelimiter, Some(listPrefix))
// #list-bucket-delimiter
val result = keySource.runWith(Sink.head)
result.futureValue.key shouldBe listKey
}
it should "list keys for a given bucket with a delimiter and prefix using the version 1 api" in {
mockListBucketAndCommonPrefixesVersion1()
val useVersion1Api = S3Ext(system).settings
.withListBucketApiVersion(ApiVersion.ListBucketVersion1)
val keySource: Source[ListBucketResultContents, NotUsed] =
S3.listBucket(bucket, listDelimiter, Some(listPrefix))
.withAttributes(S3Attributes.settings(useVersion1Api))
val result = keySource.runWith(Sink.head)
result.futureValue.key shouldBe listKey
}
it should "make a bucket with given name" in {
mockMakingBucket()
// #make-bucket
val bucketName = "samplebucket1"
implicit val sampleAttributes: Attributes = S3Attributes.settings(sampleSettings)
val makeBucketRequest: Future[Done] = S3.makeBucket(bucketName)
val makeBucketSourceRequest: Source[Done, NotUsed] = S3.makeBucketSource(bucketName)
// #make-bucket
makeBucketRequest.futureValue shouldBe Done
makeBucketSourceRequest.runWith(Sink.ignore).futureValue shouldBe Done
}
it should "delete a bucket with given name" in {
val bucketName = "samplebucket1"
mockDeletingBucket()
// #delete-bucket
implicit val sampleAttributes: Attributes = S3Attributes.settings(sampleSettings)
val deleteBucketRequest: Future[Done] = S3.deleteBucket(bucketName)
val deleteBucketSourceRequest: Source[Done, NotUsed] = S3.deleteBucketSource(bucketName)
// #delete-bucket
deleteBucketRequest.futureValue shouldBe Done
deleteBucketSourceRequest.runWith(Sink.ignore).futureValue shouldBe Done
}
it should "check for non existing buckets" in {
mockCheckingBucketStateForNonExistingBucket()
// #check-if-bucket-exists
implicit val sampleAttributes: Attributes = S3Attributes.settings(sampleSettings)
val doesntExistRequest: Future[BucketAccess] = S3.checkIfBucketExists(bucket)
val doesntExistSourceRequest: Source[BucketAccess, NotUsed] = S3.checkIfBucketExistsSource(bucket)
// #check-if-bucket-exists
doesntExistRequest.futureValue shouldBe NotExists
doesntExistSourceRequest.runWith(Sink.head).futureValue shouldBe NotExists
}
it should "check for existing buckets" in {
mockCheckingBucketStateForExistingBucket()
val existRequest: Future[BucketAccess] = S3.checkIfBucketExists(bucket)
val existSourceRequest: Source[BucketAccess, NotUsed] = S3.checkIfBucketExistsSource(bucket)
existRequest.futureValue shouldBe AccessGranted
existSourceRequest.runWith(Sink.head).futureValue shouldBe AccessGranted
}
it should "check for buckets without rights" in {
mockCheckingBucketStateForBucketWithoutRights()
val noRightsRequest: Future[BucketAccess] = S3.checkIfBucketExists(bucket)
val noRightsSourceRequest: Source[BucketAccess, NotUsed] = S3.checkIfBucketExistsSource(bucket)
noRightsRequest.futureValue shouldBe AccessDenied
noRightsSourceRequest.runWith(Sink.head).futureValue shouldBe AccessDenied
}
override protected def afterAll(): Unit = {
super.afterAll()
stopWireMockServer()
}
}