Adding failure handling for connectors
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 0ff87d4..e7480c5 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -117,31 +117,37 @@
                 inConnector.init(srcCC);
                 outConnector.init(dstCC);
 
-                while(uploadLength < fileLength) {
+                try {
+                    while (uploadLength < fileLength) {
 
-                    long endPos = uploadLength + chunkSize;
-                    if (endPos > fileLength) {
-                        endPos = fileLength;
+                        long endPos = uploadLength + chunkSize;
+                        if (endPos > fileLength) {
+                            endPos = fileLength;
+                        }
+
+
+                        completionService.submit(new ChunkMover(inConnector,
+                                outConnector, uploadLength, endPos, chunkIdx,
+                                transferId, doChunkStreaming));
+
+                        uploadLength = endPos;
+                        chunkIdx++;
                     }
 
 
-                    completionService.submit(new ChunkMover(inConnector,
-                            outConnector, uploadLength, endPos, chunkIdx,
-                            transferId, doChunkStreaming));
+                    for (int i = 0; i < chunkIdx; i++) {
+                        Future<Integer> future = completionService.take();
+                    }
 
-                    uploadLength = endPos;
-                    chunkIdx++;
+                    inConnector.complete();
+                    outConnector.complete();
+                    logger.info("Completed chunked transfer for transfer {}", transferId);
+
+                } catch (Exception e) {
+                    inConnector.failed();
+                    outConnector.failed();
+                    throw e;
                 }
-
-
-                for (int i = 0; i < chunkIdx; i++) {
-                    Future<Integer> future = completionService.take();
-                }
-
-                inConnector.complete();
-                outConnector.complete();
-                logger.info("Completed chunked transfer for transfer {}", transferId);
-
             } else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {
 
                 logger.info("Starting streaming transfer for transfer {}", transferId);
@@ -154,49 +160,55 @@
                 inConnector.init(srcCC);
                 outConnector.init(dstCC);
 
-                String srcChild = request.getSourceChildResourcePath();
-                String dstChild = request.getDestinationChildResourcePath();
+                try {
+                    String srcChild = request.getSourceChildResourcePath();
+                    String dstChild = request.getDestinationChildResourcePath();
 
-                InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
-                OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
+                    InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
+                    OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
 
-                long count = 0;
-                final AtomicLong countAtomic = new AtomicLong();
-                countAtomic.set(count);
-
-                monitorPool.submit(() -> {
-                    while (true) {
-                        try {
-                            Thread.sleep(2000);
-                        } catch (InterruptedException e) {
-                            // Ignore
-                        }
-                        if (!transferInProgress.get()) {
-                            logger.info("Status monitor is exiting for transfer {}", transferId);
-                            break;
-                        }
-                        double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
-                        logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
-                        onStatusCallback.accept(transferId, new TransferState()
-                                .setPercentage(transferPercentage)
-                                .setState("RUNNING")
-                                .setUpdateTimeMils(System.currentTimeMillis())
-                                .setDescription("Transfer Progress Updated"));
-                    }
-                });
-
-                int n;
-                byte[] buffer = new byte[128 * 1024];
-                for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
-                    outputStream.write(buffer, 0, n);
+                    long count = 0;
+                    final AtomicLong countAtomic = new AtomicLong();
                     countAtomic.set(count);
+
+                    monitorPool.submit(() -> {
+                        while (true) {
+                            try {
+                                Thread.sleep(2000);
+                            } catch (InterruptedException e) {
+                                // Ignore
+                            }
+                            if (!transferInProgress.get()) {
+                                logger.info("Status monitor is exiting for transfer {}", transferId);
+                                break;
+                            }
+                            double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
+                            logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
+                            onStatusCallback.accept(transferId, new TransferState()
+                                    .setPercentage(transferPercentage)
+                                    .setState("RUNNING")
+                                    .setUpdateTimeMils(System.currentTimeMillis())
+                                    .setDescription("Transfer Progress Updated"));
+                        }
+                    });
+
+                    int n;
+                    byte[] buffer = new byte[128 * 1024];
+                    for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+                        outputStream.write(buffer, 0, n);
+                        countAtomic.set(count);
+                    }
+
+                    inConnector.complete();
+                    outConnector.complete();
+
+                    logger.info("Completed streaming transfer for transfer {}", transferId);
+                } catch (Exception e) {
+                    inConnector.failed();
+                    outConnector.failed();
+                    throw e;
                 }
 
-                inConnector.complete();
-                outConnector.complete();
-
-                logger.info("Completed streaming transfer for transfer {}", transferId);
-
             } else {
                 throw new Exception("No matching connector found to perform the transfer");
             }
@@ -224,6 +236,7 @@
                     .setState("FAILED")
                     .setUpdateTimeMils(System.currentTimeMillis())
                     .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+            exitingCallback.accept(transferId, false);
         } finally {
             transferInProgress.set(false);
         }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
index 1ca72d6..52d85eb 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
@@ -3,4 +3,5 @@
 public interface BasicConnector {
     public void init(ConnectorConfig connectorConfig) throws Exception;
     public void complete() throws Exception;
+    public void failed() throws Exception;
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
index 900b148..3bc650a 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
@@ -19,9 +19,7 @@
 
 import java.io.InputStream;
 
-public interface IncomingStreamingConnector {
-    public void init(ConnectorConfig connectorConfig) throws Exception;
+public interface IncomingStreamingConnector extends BasicConnector {
     public InputStream fetchInputStream() throws Exception;
     public InputStream fetchInputStream(String childPath) throws Exception;
-    public void complete() throws Exception;
 }
diff --git a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java
index 3626885..b8415be 100644
--- a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java
+++ b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java
@@ -113,4 +113,9 @@
             client.close();
         }
     }
+
+    @Override
+    public void failed() throws Exception {
+
+    }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
index d09ffa2..015470d 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -116,4 +116,9 @@
     public void complete() throws Exception {
 
     }
+
+    @Override
+    public void failed() throws Exception {
+
+    }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index 617e50c..01a2298 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -130,4 +130,9 @@
         logger.info("Completing the upload for file {} in bucket {}", resource.getFile().getResourcePath(),
                 resource.getS3Storage().getBucketName());
     }
+
+    @Override
+    public void failed() throws Exception {
+
+    }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java
index a039855..fa20539 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java
@@ -109,6 +109,11 @@
     }
 
     @Override
+    public void failed() throws Exception {
+
+    }
+
+    @Override
     public OutputStream fetchOutputStream() throws Exception {
         this.s3OutputStream = S3OutputStream.builder()
                 .s3(s3)
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
index d6bacdb..1ec6cd1 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
@@ -204,6 +204,11 @@
         session.disconnect();
     }
 
+    @Override
+    public void failed() throws Exception {
+
+    }
+
     private int checkAck(InputStream in) throws IOException {
         int b = in.read();
         // b may be 0 for success,
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
index 28ea5e0..48044aa 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
@@ -198,6 +198,11 @@
         session.disconnect();
     }
 
+    @Override
+    public void failed() throws Exception {
+
+    }
+
     public int checkAck(InputStream in) throws IOException {
         int b = in.read();
         // b may be 0 for success,
diff --git a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java
index 47efd38..7c195b7 100644
--- a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java
+++ b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java
@@ -119,6 +119,11 @@
     }
 
     @Override
+    public void failed() throws Exception {
+
+    }
+
+    @Override
     public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
         SwiftObject swiftObject = objectApi.get(
                 resource.getFile().getResourcePath(),
diff --git a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java
index 3734e8b..38fd79a 100644
--- a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java
+++ b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java
@@ -133,6 +133,11 @@
     }
 
     @Override
+    public void failed() throws Exception {
+
+    }
+
+    @Override
     public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception {
         InputStream fis = new FileInputStream(uploadFile);
         uploadChunk(chunkId, startByte, endByte, fis);