Local incoming chunked update
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java
index 91f5de6..dd388ab 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java
@@ -21,22 +21,24 @@
import org.apache.airavata.mft.core.api.ConnectorConfig;
import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
-import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.File;
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LocalIncomingChunkedConnector implements IncomingChunkedConnector {
private String resourcePath;
+ private long resourceSize;
private static final Logger logger = LoggerFactory.getLogger(LocalIncomingChunkedConnector.class);
@Override
public void init(ConnectorConfig connectorConfig) throws Exception {
this.resourcePath = connectorConfig.getResourcePath();
+ this.resourceSize = connectorConfig.getMetadata().getFile().getResourceSize();
}
@Override
@@ -53,45 +55,40 @@
@Override
public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
- FileInputStream from = new FileInputStream(new File(this.resourcePath));
- FileOutputStream to = new FileOutputStream(new File(downloadFile));
+ logger.info("Downloading chunk {} with start byte {} and end byte {} to file {} from resource path {}",
+ chunkId, startByte, endByte, downloadFile, this.resourcePath);
- final int buffLen = 1024;
+// #use this code on a DMA enabled device
+// if (resourceSize <= endByte - startByte) {
+// Files.copy(Path.of(this.resourcePath), Path.of(downloadFile));
+// } else {
+// try (FileInputStream from = new FileInputStream(this.resourcePath);
+// FileOutputStream to = new FileOutputStream(downloadFile)) {
+// from.getChannel().transferTo(startByte, endByte - startByte, to.getChannel());
+// } catch (Exception e) {
+// logger.error("Unexpected error occurred while downloading chunk {} to file {} from resource path {}",
+// chunkId, downloadFile, this.resourcePath, e);
+// throw e;
+// }
+// }
- byte[] buf = new byte[buffLen];
-
- from.skip(startByte);
-
- long fileSize = endByte - startByte + 1;
-
- while (true) {
- int bufSize = 0;
-
- if (buffLen < fileSize) {
- bufSize = buffLen;
- } else {
- bufSize = (int) fileSize;
+ int buffLen = 1024 * 1024 * 16;
+ try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(this.resourcePath),buffLen);
+ BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(downloadFile))) {
+ byte[] buffer = new byte[buffLen];
+ int read = 0;
+ long totalRead = bis.skip(startByte);
+ while ((read = bis.read(buffer,0,Math.min(buffLen, (int) (endByte - totalRead )))) > 0) {
+ bos.write(buffer, 0, read);
+ totalRead += read;
}
-
- bufSize = (int) from.read(buf, 0, bufSize);
-
- if (bufSize < 0) {
- break;
- }
-
- to.write(buf, 0, bufSize);
- to.flush();
-
- fileSize -= bufSize;
-
- if (fileSize == 0L) {
- break;
- }
+ bis.close();
+ bos.close();
+ } catch (Exception e) {
+ logger.error("Unexpected error occurred while downloading chunk {} to file {} from resource path {}",
+ chunkId, downloadFile, this.resourcePath, e);
+ throw e;
}
-
- from.close();
- to.close();
-
}
@Override
@@ -101,6 +98,6 @@
from.skip(startByte);
- return from;
+ return new BufferedInputStream(from, Math.min(16 * 1024 * 1024,(int) (endByte - startByte)));
}
}