Adding failure handling for connectors
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 0ff87d4..e7480c5 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -117,31 +117,37 @@
inConnector.init(srcCC);
outConnector.init(dstCC);
- while(uploadLength < fileLength) {
+ try {
+ while (uploadLength < fileLength) {
- long endPos = uploadLength + chunkSize;
- if (endPos > fileLength) {
- endPos = fileLength;
+ long endPos = uploadLength + chunkSize;
+ if (endPos > fileLength) {
+ endPos = fileLength;
+ }
+
+
+ completionService.submit(new ChunkMover(inConnector,
+ outConnector, uploadLength, endPos, chunkIdx,
+ transferId, doChunkStreaming));
+
+ uploadLength = endPos;
+ chunkIdx++;
}
- completionService.submit(new ChunkMover(inConnector,
- outConnector, uploadLength, endPos, chunkIdx,
- transferId, doChunkStreaming));
+ for (int i = 0; i < chunkIdx; i++) {
+ Future<Integer> future = completionService.take();
+ }
- uploadLength = endPos;
- chunkIdx++;
+ inConnector.complete();
+ outConnector.complete();
+ logger.info("Completed chunked transfer for transfer {}", transferId);
+
+ } catch (Exception e) {
+ inConnector.failed();
+ outConnector.failed();
+ throw e;
}
-
-
- for (int i = 0; i < chunkIdx; i++) {
- Future<Integer> future = completionService.take();
- }
-
- inConnector.complete();
- outConnector.complete();
- logger.info("Completed chunked transfer for transfer {}", transferId);
-
} else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {
logger.info("Starting streaming transfer for transfer {}", transferId);
@@ -154,49 +160,55 @@
inConnector.init(srcCC);
outConnector.init(dstCC);
- String srcChild = request.getSourceChildResourcePath();
- String dstChild = request.getDestinationChildResourcePath();
+ try {
+ String srcChild = request.getSourceChildResourcePath();
+ String dstChild = request.getDestinationChildResourcePath();
- InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
- OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
+ InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
+ OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
- long count = 0;
- final AtomicLong countAtomic = new AtomicLong();
- countAtomic.set(count);
-
- monitorPool.submit(() -> {
- while (true) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- // Ignore
- }
- if (!transferInProgress.get()) {
- logger.info("Status monitor is exiting for transfer {}", transferId);
- break;
- }
- double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
- logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(transferPercentage)
- .setState("RUNNING")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer Progress Updated"));
- }
- });
-
- int n;
- byte[] buffer = new byte[128 * 1024];
- for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
- outputStream.write(buffer, 0, n);
+ long count = 0;
+ final AtomicLong countAtomic = new AtomicLong();
countAtomic.set(count);
+
+ monitorPool.submit(() -> {
+ while (true) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ if (!transferInProgress.get()) {
+ logger.info("Status monitor is exiting for transfer {}", transferId);
+ break;
+ }
+ double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
+ logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(transferPercentage)
+ .setState("RUNNING")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer Progress Updated"));
+ }
+ });
+
+ int n;
+ byte[] buffer = new byte[128 * 1024];
+ for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+ outputStream.write(buffer, 0, n);
+ countAtomic.set(count);
+ }
+
+ inConnector.complete();
+ outConnector.complete();
+
+ logger.info("Completed streaming transfer for transfer {}", transferId);
+ } catch (Exception e) {
+ inConnector.failed();
+ outConnector.failed();
+ throw e;
}
- inConnector.complete();
- outConnector.complete();
-
- logger.info("Completed streaming transfer for transfer {}", transferId);
-
} else {
throw new Exception("No matching connector found to perform the transfer");
}
@@ -224,6 +236,7 @@
.setState("FAILED")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+ exitingCallback.accept(transferId, false);
} finally {
transferInProgress.set(false);
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
index 1ca72d6..52d85eb 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
@@ -3,4 +3,5 @@
public interface BasicConnector {
public void init(ConnectorConfig connectorConfig) throws Exception;
public void complete() throws Exception;
+ public void failed() throws Exception;
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
index 900b148..3bc650a 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
@@ -19,9 +19,7 @@
import java.io.InputStream;
-public interface IncomingStreamingConnector {
- public void init(ConnectorConfig connectorConfig) throws Exception;
+public interface IncomingStreamingConnector extends BasicConnector {
public InputStream fetchInputStream() throws Exception;
public InputStream fetchInputStream(String childPath) throws Exception;
- public void complete() throws Exception;
}
diff --git a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java
index 3626885..b8415be 100644
--- a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java
+++ b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java
@@ -113,4 +113,9 @@
client.close();
}
}
+
+ @Override
+ public void failed() throws Exception {
+
+ }
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
index d09ffa2..015470d 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -116,4 +116,9 @@
public void complete() throws Exception {
}
+
+ @Override
+ public void failed() throws Exception {
+
+ }
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index 617e50c..01a2298 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -130,4 +130,9 @@
logger.info("Completing the upload for file {} in bucket {}", resource.getFile().getResourcePath(),
resource.getS3Storage().getBucketName());
}
+
+ @Override
+ public void failed() throws Exception {
+
+ }
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java
index a039855..fa20539 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java
@@ -109,6 +109,11 @@
}
@Override
+ public void failed() throws Exception {
+
+ }
+
+ @Override
public OutputStream fetchOutputStream() throws Exception {
this.s3OutputStream = S3OutputStream.builder()
.s3(s3)
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
index d6bacdb..1ec6cd1 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
@@ -204,6 +204,11 @@
session.disconnect();
}
+ @Override
+ public void failed() throws Exception {
+
+ }
+
private int checkAck(InputStream in) throws IOException {
int b = in.read();
// b may be 0 for success,
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
index 28ea5e0..48044aa 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
@@ -198,6 +198,11 @@
session.disconnect();
}
+ @Override
+ public void failed() throws Exception {
+
+ }
+
public int checkAck(InputStream in) throws IOException {
int b = in.read();
// b may be 0 for success,
diff --git a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java
index 47efd38..7c195b7 100644
--- a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java
+++ b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java
@@ -119,6 +119,11 @@
}
@Override
+ public void failed() throws Exception {
+
+ }
+
+ @Override
public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
SwiftObject swiftObject = objectApi.get(
resource.getFile().getResourcePath(),
diff --git a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java
index 3734e8b..38fd79a 100644
--- a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java
+++ b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java
@@ -133,6 +133,11 @@
}
@Override
+ public void failed() throws Exception {
+
+ }
+
+ @Override
public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception {
InputStream fis = new FileInputStream(uploadFile);
uploadChunk(chunkId, startByte, endByte, fis);