[ENHANCEMENT] AESBlobStoreDAO::readBytes: remove 1 file copy
diff --git a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
index a73495e..b8d7478 100644
--- a/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
+++ b/server/blob/blob-aes/src/main/java/org/apache/james/blob/aes/AESBlobStoreDAO.java
@@ -23,8 +23,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
import java.security.GeneralSecurityException;
import java.util.Collection;
import java.util.Optional;
@@ -36,6 +34,7 @@
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.util.ReactorUtils;
import org.apache.james.util.Size;
import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;
@@ -104,18 +103,12 @@
throw new RuntimeException(blobId.asString() + " exceeded maximum blob size");
}
- FileBackedOutputStream encryptedContent = new FileBackedOutputStream(FILE_THRESHOLD_DECRYPT);
- WritableByteChannel channel = Channels.newChannel(encryptedContent);
-
- return Flux.from(ciphertext.getContent())
- .publishOn(Schedulers.boundedElastic())
- .doOnNext(Throwing.consumer(channel::write))
- .then(Mono.fromCallable(() -> {
+ return Mono.fromCallable(() -> {
try {
FileBackedOutputStream decryptedContent = new FileBackedOutputStream(FILE_THRESHOLD_DECRYPT);
try {
CountingOutputStream countingOutputStream = new CountingOutputStream(decryptedContent);
- try (InputStream ciphertextStream = encryptedContent.asByteSource().openStream()) {
+ try (InputStream ciphertextStream = ReactorUtils.toInputStream(Flux.from(ciphertext.getContent()))) {
decrypt(ciphertextStream).transferTo(countingOutputStream);
}
try (InputStream decryptedStream = decryptedContent.asByteSource().openStream()) {
@@ -130,12 +123,7 @@
.error("OOM reading {}. Blob size read so far {} bytes.", blobId.asString(), ciphertext.getSize());
throw error;
}
- }))
- .doFinally(Throwing.consumer(any -> {
- channel.close();
- encryptedContent.reset();
- encryptedContent.close();
- }));
+ });
}
@Override
@@ -157,7 +145,7 @@
public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
return Mono.from(underlying.readAsByteSource(bucketName, blobId))
.flatMap(reactiveByteSource -> decryptReactiveByteSource(reactiveByteSource, blobId))
- .subscribeOn(Schedulers.boundedElastic());
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
}
@Override