[Fix] AESBlobStore threading
Was blocking on the S3 driver Netty EventLoop and on the parrallel scheduler
(verified with printing thread names)
diff --git a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
index ca1cbee..a73495e 100644
--- a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
+++ b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
@@ -108,6 +108,7 @@
WritableByteChannel channel = Channels.newChannel(encryptedContent);
return Flux.from(ciphertext.getContent())
+ .publishOn(Schedulers.boundedElastic())
.doOnNext(Throwing.consumer(channel::write))
.then(Mono.fromCallable(() -> {
try {
@@ -174,10 +175,10 @@
Preconditions.checkNotNull(blobId);
Preconditions.checkNotNull(inputStream);
- return Mono.using(
- () -> encrypt(inputStream),
+ return Mono.usingWhen(
+ Mono.fromCallable(() -> encrypt(inputStream)),
pair -> Mono.from(underlying.save(bucketName, blobId, byteSourceWithSize(pair.getLeft().asByteSource(), pair.getRight()))),
- Throwing.consumer(pair -> pair.getLeft().reset()))
+ Throwing.function(pair -> Mono.fromRunnable(Throwing.runnable(pair.getLeft()::reset)).subscribeOn(Schedulers.boundedElastic())))
.subscribeOn(Schedulers.boundedElastic())
.onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving bytearray", e));
}