exclude lof4j-slf4j-impl jar
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java b/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
index 19fd30e..9e2e1f6 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
@@ -96,11 +96,13 @@
this.responseQueueMap.put(request.getMessageId(), queue);
try {
+ logger.info("Requesting sync request {} on agent {}", request.getRequestId(), request.getAgentId());
this.mftConsulClient.sendSyncRPCToAgent(request.getAgentId(), request);
SyncRPCResponse response = queue.poll(waitMs, TimeUnit.MILLISECONDS);
if (response == null) {
throw new MFTConsulClientException("Timed out waiting for the response");
}
+ logger.info("Completing sync request {} on agent {}", request.getRequestId(), request.getAgentId());
return response;
} finally {
this.responseQueueMap.remove(request.getMessageId());
@@ -108,6 +110,6 @@
}
public SyncRPCResponse sendSyncRequest(SyncRPCRequest request) throws MFTConsulClientException, InterruptedException {
- return sendSyncRequest(request, 10000);
+ return sendSyncRequest(request, 100000);
}
}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
index 1aadc83..0d0bdff 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
@@ -19,8 +19,10 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
public class SyncRPCRequest {
+ private String requestId = UUID.randomUUID().toString();
private String agentId;
private String method;
private Map<String, String> parameters;
@@ -72,6 +74,14 @@
return this;
}
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
public static final class SyncRPCRequestBuilder {
private String agentId;
private String method;
diff --git a/agent/pom.xml b/agent/pom.xml
index 7d6765e..b7a2ce2 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -37,6 +37,12 @@
<groupId>org.apache.airavata</groupId>
<artifactId>mft-scp-transport</artifactId>
<version>0.01-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.custos</groupId>
+ <artifactId>custos-java-sdk</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
@@ -87,6 +93,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -95,6 +105,16 @@
<version>2.10.0</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.10.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-xml</artifactId>
+ <version>2.10.0</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${log4j.over.slf4j}</version>
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 5ccf25a..fc670c5 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -35,9 +35,9 @@
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.api.service.CallbackEndpoint;
import org.apache.airavata.mft.api.service.TransferApiRequest;
-import org.apache.airavata.mft.core.ConnectorResolver;
+import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
@@ -65,8 +65,6 @@
private static final Logger logger = LoggerFactory.getLogger(MFTAgent.class);
- private final TransportMediator mediator = new TransportMediator();
-
@org.springframework.beans.factory.annotation.Value("${agent.id}")
private String agentId;
@@ -88,6 +86,9 @@
@org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
private String supportedProtocols;
+ @org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
+ private String tempDataDir = "/tmp";
+
@org.springframework.beans.factory.annotation.Value("${resource.service.host}")
private String resourceServiceHost;
@@ -113,6 +114,9 @@
private long sessionTTLSeconds = 10;
private String session;
+ private TransportMediator mediator;
+
+
private ObjectMapper mapper = new ObjectMapper();
@Autowired
@@ -127,6 +131,7 @@
public void init() {
transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
+ mediator = new TransportMediator(tempDataDir);
}
private void acceptRPCRequests() {
@@ -172,14 +177,6 @@
.setPublisher(agentId)
.setDescription("Starting the transfer"));
- Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
- Connector inConnector = inConnectorOpt.orElseThrow(() -> new Exception("Could not find an in connector for given input"));
- inConnector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-
- Optional<Connector> outConnectorOpt = ConnectorResolver.resolveConnector(request.getDestinationType(), "OUT");
- Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
- outConnector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-
Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
srcMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
@@ -188,6 +185,34 @@
MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
dstMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+ FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
+ request.getMftAuthorizationToken(),
+ request.getSourceResourceId(),
+ request.getSourceToken());
+
+
+ ConnectorConfig srcCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+ .withAuthToken(request.getMftAuthorizationToken())
+ .withResourceServiceHost(resourceServiceHost)
+ .withResourceServicePort(resourceServicePort)
+ .withSecretServiceHost(secretServiceHost)
+ .withSecretServicePort(secretServicePort)
+ .withTransferId(transferId)
+ .withResourceId(request.getSourceResourceId())
+ .withCredentialToken(request.getSourceToken())
+ .withMetadata(srcMetadata).build();
+
+ ConnectorConfig dstCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+ .withAuthToken(request.getMftAuthorizationToken())
+ .withResourceServiceHost(resourceServiceHost)
+ .withResourceServicePort(resourceServicePort)
+ .withSecretServiceHost(secretServiceHost)
+ .withSecretServicePort(secretServicePort)
+ .withTransferId(transferId)
+ .withResourceId(request.getDestinationResourceId())
+ .withCredentialToken(request.getDestinationToken())
+ .withMetadata(srcMetadata).build();
+
mftConsulClient.submitTransferStateToProcess(transferId, agentId, new TransferState()
.setState("STARTED")
.setPercentage(0)
@@ -195,13 +220,11 @@
.setPublisher(agentId)
.setDescription("Started the transfer"));
-
- TransferApiRequest finalRequest = request;
- mediator.transfer(transferId, request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector,
+ mediator.transferSingleThread(transferId, request, srcCC, dstCC,
(id, st) -> {
try {
mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
- handleCallbacks(finalRequest.getCallbackEndpointsList(), id, st);
+
} catch (MFTConsulClientException e) {
logger.error("Failed while updating transfer state", e);
}
@@ -213,8 +236,7 @@
} catch (Exception e) {
logger.error("Failed while deleting scheduled path for transfer {}", id);
}
- }
- );
+ });
logger.info("Started the transfer " + transferId);
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index b9195b8..b891b05 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -20,17 +20,18 @@
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.apache.airavata.mft.core.*;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.core.api.*;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
public class TransportMediator {
@@ -40,197 +41,209 @@
/*
Number of maximum transfers handled at atime
*/
- private int concurrentTransfers = 10;
- private ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
- private ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
+ private final int concurrentTransfers = 10;
+ private final ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
+ private final ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
+
+ private String tempDataDir = "/tmp";
+
+ public TransportMediator(String tempDataDir) {
+ this.tempDataDir = tempDataDir;
+ }
+
+ public void transferSingleThread(String transferId,
+ TransferApiRequest request,
+ ConnectorConfig srcCC,
+ ConnectorConfig dstCC,
+ BiConsumer<String, TransferState> onStatusCallback,
+ BiConsumer<String, Boolean> exitingCallback) {
+
+ executor.submit(() -> {
+
+ final AtomicBoolean transferInProgress = new AtomicBoolean(true);
+
+ try {
+
+ long start = System.currentTimeMillis();
+
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(100)
+ .setState("RUNNING")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer successfully completed"));
+
+ Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
+ .resolveIncomingStreamingConnector(request.getSourceType());
+ Optional<OutgoingStreamingConnector> outStreamingConnectorOp = ConnectorResolver
+ .resolveOutgoingStreamingConnector(request.getDestinationType());
+
+ Optional<IncomingChunkedConnector> inChunkedConnectorOp = ConnectorResolver
+ .resolveIncomingChunkedConnector(request.getSourceType());
+ Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
+ .resolveOutgoingChunkedConnector(request.getDestinationType());
+
+ // Give priority for chunked transfers.
+ // TODO: Provide a preference at the API level
+ if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {
+
+ logger.info("Starting the chunked transfer for transfer {}", transferId);
+
+ long chunkSize = 20 * 1024 * 1024L;
+
+ ExecutorService chunkedExecutorService = Executors.newFixedThreadPool(20);
+
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(chunkedExecutorService);
+
+ long fileLength = srcCC.getMetadata().getResourceSize();
+ long uploadLength = 0L;
+ int chunkIdx = 0;
+
+ IncomingChunkedConnector inConnector = inChunkedConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + request.getSourceType()));
+
+ OutgoingChunkedConnector outConnector = outChunkedConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + request.getDestinationType()));
+
+ inConnector.init(srcCC);
+ outConnector.init(dstCC);
+
+ while(uploadLength < fileLength) {
+
+ long endPos = uploadLength + chunkSize;
+ if (endPos > fileLength) {
+ endPos = fileLength;
+ }
+
+ String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
+ completionService.submit(new ChunkMover(inConnector, outConnector, uploadLength, endPos, chunkIdx, tempFile));
+
+ uploadLength = endPos;
+ chunkIdx++;
+ }
+
+
+ for (int i = 0; i < chunkIdx; i++) {
+ Future<Integer> future = completionService.take();
+ }
+
+ logger.info("Completed chunked transfer for transfer {}", transferId);
+
+ } else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {
+
+ logger.info("Starting streaming transfer for transfer {}", transferId);
+ IncomingStreamingConnector inConnector = inStreamingConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + request.getSourceType()));
+
+ OutgoingStreamingConnector outConnector = outStreamingConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + request.getDestinationType()));
+
+ inConnector.init(srcCC);
+ outConnector.init(dstCC);
+
+ String srcChild = request.getSourceChildResourcePath();
+ String dstChild = request.getDestinationChildResourcePath();
+
+ InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
+ OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
+
+ long count = 0;
+ final AtomicLong countAtomic = new AtomicLong();
+ countAtomic.set(count);
+
+ monitorPool.submit(() -> {
+ while (true) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ if (!transferInProgress.get()) {
+ logger.info("Status monitor is exiting for transfer {}", transferId);
+ break;
+ }
+ double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
+ logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(transferPercentage)
+ .setState("RUNNING")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer Progress Updated"));
+ }
+ });
+
+ int n;
+ byte[] buffer = new byte[128 * 1024];
+ for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+ outputStream.write(buffer, 0, n);
+ countAtomic.set(count);
+ }
+
+ inConnector.complete();
+ outConnector.complete();
+
+ logger.info("Completed s ttreaming ransfer for transfer {}", transferId);
+
+ } else {
+ throw new Exception("No matching connector found to perform the transfer");
+ }
+
+ long time = (System.currentTimeMillis() - start) / 1000;
+
+ logger.info("Transfer {} completed. Time {} S. Speed {} MB/s", transferId, time,
+ (srcCC.getMetadata().getResourceSize() * 1.0 / time) / (1024 * 1024));
+
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(100)
+ .setState("COMPLETED")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer successfully completed"));
+
+ exitingCallback.accept(transferId, true);
+ } catch (Exception e) {
+
+ logger.error("Transfer {} failed with error", transferId, e);
+
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(0)
+ .setState("FAILED")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+ } finally {
+ transferInProgress.set(false);
+ }
+ });
+ }
public void destroy() {
executor.shutdown();
+ monitorPool.shutdown();
}
- public void transfer(String transferId, TransferApiRequest request, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
- MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
- BiConsumer<String, Boolean> exitingCallback) throws Exception {
+ private static class ChunkMover implements Callable<Integer> {
- FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
- request.getMftAuthorizationToken(),
- request.getSourceResourceId(),
- request.getSourceToken());
+ IncomingChunkedConnector downloader;
+ OutgoingChunkedConnector uploader;
+ long uploadLength;
+ long endPos;
+ int chunkIdx;
+ String tempFile;
- final long resourceSize = srcMetadata.getResourceSize();
- logger.debug("Source file size {}. MD5 {}", resourceSize, srcMetadata.getMd5sum());
+ public ChunkMover(IncomingChunkedConnector downloader, OutgoingChunkedConnector uploader, long uploadLength,
+ long endPos, int chunkIdx, String tempFile) {
+ this.downloader = downloader;
+ this.uploader = uploader;
+ this.uploadLength = uploadLength;
+ this.endPos = endPos;
+ this.chunkIdx = chunkIdx;
+ this.tempFile = tempFile;
+ }
- final DoubleStreamingBuffer streamBuffer = new DoubleStreamingBuffer();
- final ReentrantLock statusLock = new ReentrantLock();
-
- ConnectorContext context = new ConnectorContext();
- context.setMetadata(srcMetadata);
- context.setStreamBuffer(streamBuffer);
- context.setTransferId(transferId);
-
- TransferTask recvTask = new TransferTask(request.getMftAuthorizationToken(), request.getSourceResourceId(),
- request.getSourceChildResourcePath(), request.getSourceToken(), context, inConnector);
- TransferTask sendTask = new TransferTask(request.getMftAuthorizationToken(), request.getDestinationResourceId(),
- request.getDestinationChildResourcePath(), request.getDestinationToken(), context, outConnector);
- List<Future<Integer>> futureList = new ArrayList<>();
-
- ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
-
- long startTime = System.nanoTime();
-
- futureList.add(completionService.submit(recvTask));
- futureList.add(completionService.submit(sendTask));
-
- final AtomicBoolean transferInProgress = new AtomicBoolean(true);
- final AtomicBoolean transferSuccess = new AtomicBoolean(true);
-
-
- // Monitoring the completeness of the transfer
- Thread monitorThread = new Thread(new Runnable() {
- @Override
- public void run() {
-
- try {
- int futureCnt = futureList.size();
- boolean transferErrored = false;
-
- for (int i = 0; i < futureCnt; i++) {
- Future<Integer> ft = completionService.take();
- futureList.remove(ft);
- try {
- ft.get();
- } catch (Exception e) {
-
- logger.error("One task failed with error", e);
- transferErrored = true;
- statusLock.lock();
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(0)
- .setState("FAILED")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
- transferInProgress.set(false);
- transferSuccess.set(false);
- statusLock.unlock();
-
- for (Future<Integer> f : futureList) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- logger.error("Sleep failed", e);
- }
- f.cancel(true);
- }
- futureList.clear();
- }
- }
-
- if (!transferErrored) {
- Boolean transferred = destMetadataCollector.isAvailable(
- request.getMftAuthorizationToken(),
- request.getDestinationResourceId(),
- request.getDestinationToken());
-
-
- if (!transferred) {
- logger.error("Transfer completed but resource is not available in destination");
- throw new Exception("Transfer completed but resource is not available in destination");
- }
-
- FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(
- request.getMftAuthorizationToken(),
- request.getDestinationResourceId(),
- request.getDestinationToken());
-
-
- boolean doIntegrityVerify = true;
-
- if (srcMetadata.getMd5sum() == null) {
- logger.warn("MD5 sum is not available for source resource. So this disables integrity verification");
- doIntegrityVerify = false;
- } else if (destMetadata.getMd5sum() == null) {
- logger.warn("MD5 sum is not available for destination resource. So this disables integrity verification");
- doIntegrityVerify = false;
- }
-
- if (doIntegrityVerify && !destMetadata.getMd5sum().equals(srcMetadata.getMd5sum())) {
- logger.error("Resource integrity violated. MD5 sums are not matching. Source md5 {} destination md5 {}",
- srcMetadata.getMd5sum(), destMetadata.getMd5sum());
- throw new Exception("Resource integrity violated. MD5 sums are not matching. Source md5 " + srcMetadata.getMd5sum()
- + " destination md5 " + destMetadata.getMd5sum());
- }
-
- // Check
-
- long endTime = System.nanoTime();
-
- double time = (endTime - startTime) * 1.0 / 1000000000;
-
- statusLock.lock();
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(100)
- .setState("COMPLETED")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer successfully completed"));
- transferInProgress.set(false);
- transferSuccess.set(true);
- statusLock.unlock();
-
- logger.info("Transfer {} completed. Speed {} MB/s", transferId,
- (srcMetadata.getResourceSize() * 1.0 / time) / (1024 * 1024));
- }
- } catch (Exception e) {
-
- statusLock.lock();
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(0)
- .setState("FAILED")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
- transferInProgress.set(false);
- transferSuccess.set(false);
- statusLock.unlock();
-
- logger.error("Transfer {} failed", transferId, e);
- } finally {
- inConnector.destroy();
- outConnector.destroy();
- transferInProgress.set(false);
- exitingCallback.accept(transferId,transferSuccess.get());
- }
- }
- });
-
- // Monitoring the status of the transfer
- Thread progressThread = new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
-
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- // Ignore
- }
- statusLock.lock();
- if (!transferInProgress.get()){
- statusLock.unlock();
- logger.info("Status monitor is exiting for transfer {}", transferId);
- break;
- }
- double transferPercentage = streamBuffer.getProcessedBytes() * 100.0/ resourceSize;
- logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(transferPercentage)
- .setState("RUNNING")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer Progress Updated"));
- statusLock.unlock();
- }
- }
- });
-
- monitorPool.submit(monitorThread);
- monitorPool.submit(progressThread);
+ @Override
+ public Integer call() throws Exception {
+ downloader.downloadChunk(chunkIdx, uploadLength, endPos, tempFile);
+ uploader.uploadChunk(chunkIdx, uploadLength, endPos, tempFile);
+ new File(tempFile).delete();
+ return chunkIdx;
+ }
}
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
new file mode 100644
index 0000000..db7a444
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+
+public class AgentHttpDownloadData {
+ private IncomingStreamingConnector incomingStreamingConnector;
+ private IncomingChunkedConnector incomingChunkedConnector;
+ private ConnectorConfig connectorConfig;
+ private String childResourcePath;
+ private long createdTime = System.currentTimeMillis();
+
+ public IncomingStreamingConnector getIncomingStreamingConnector() {
+ return incomingStreamingConnector;
+ }
+
+ public void setIncomingStreamingConnector(IncomingStreamingConnector incomingStreamingConnector) {
+ this.incomingStreamingConnector = incomingStreamingConnector;
+ }
+
+ public IncomingChunkedConnector getIncomingChunkedConnector() {
+ return incomingChunkedConnector;
+ }
+
+ public void setIncomingChunkedConnector(IncomingChunkedConnector incomingChunkedConnector) {
+ this.incomingChunkedConnector = incomingChunkedConnector;
+ }
+
+ public ConnectorConfig getConnectorConfig() {
+ return connectorConfig;
+ }
+
+ public void setConnectorConfig(ConnectorConfig connectorConfig) {
+ this.connectorConfig = connectorConfig;
+ }
+
+ public String getChildResourcePath() {
+ return childResourcePath;
+ }
+
+ public void setChildResourcePath(String childResourcePath) {
+ this.childResourcePath = childResourcePath;
+ }
+
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ public void setCreatedTime(long createdTime) {
+ this.createdTime = createdTime;
+ }
+
+
+ public static final class AgentHttpDownloadDataBuilder {
+ private IncomingStreamingConnector incomingStreamingConnector;
+ private IncomingChunkedConnector incomingChunkedConnector;
+ private ConnectorConfig connectorConfig;
+ private String childResourcePath;
+ private long createdTime = System.currentTimeMillis();
+
+ private AgentHttpDownloadDataBuilder() {
+ }
+
+ public static AgentHttpDownloadDataBuilder newBuilder() {
+ return new AgentHttpDownloadDataBuilder();
+ }
+
+ public AgentHttpDownloadDataBuilder withIncomingStreamingConnector(IncomingStreamingConnector incomingConnector) {
+ this.incomingStreamingConnector = incomingConnector;
+ return this;
+ }
+
+ public AgentHttpDownloadDataBuilder withIncomingChunkedConnector(IncomingChunkedConnector incomingConnector) {
+ this.incomingChunkedConnector = incomingConnector;
+ return this;
+ }
+
+ public AgentHttpDownloadDataBuilder withConnectorConfig(ConnectorConfig connectorConfig) {
+ this.connectorConfig = connectorConfig;
+ return this;
+ }
+
+ public AgentHttpDownloadDataBuilder withChildResourcePath(String childResourcePath) {
+ this.childResourcePath = childResourcePath;
+ return this;
+ }
+
+ public AgentHttpDownloadDataBuilder withCreatedTime(long createdTime) {
+ this.createdTime = createdTime;
+ return this;
+ }
+
+
+ public AgentHttpDownloadData build() {
+ AgentHttpDownloadData agentHttpDownloadData = new AgentHttpDownloadData();
+ agentHttpDownloadData.setIncomingStreamingConnector(incomingStreamingConnector);
+ agentHttpDownloadData.setIncomingChunkedConnector(incomingChunkedConnector);
+ agentHttpDownloadData.setConnectorConfig(connectorConfig);
+ agentHttpDownloadData.setChildResourcePath(childResourcePath);
+ agentHttpDownloadData.setCreatedTime(createdTime);
+ return agentHttpDownloadData;
+ }
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
deleted file mode 100644
index 99a4b38..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-public class ConnectorParams {
-
- private String resourceServiceHost, secretServiceHost;
- private int resourceServicePort, secretServicePort;
-
- public String getResourceServiceHost() {
- return resourceServiceHost;
- }
-
- public ConnectorParams setResourceServiceHost(String resourceServiceHost) {
- this.resourceServiceHost = resourceServiceHost;
- return this;
- }
-
- public String getSecretServiceHost() {
- return secretServiceHost;
- }
-
- public ConnectorParams setSecretServiceHost(String secretServiceHost) {
- this.secretServiceHost = secretServiceHost;
- return this;
- }
-
- public int getResourceServicePort() {
- return resourceServicePort;
- }
-
- public ConnectorParams setResourceServicePort(int resourceServicePort) {
- this.resourceServicePort = resourceServicePort;
- return this;
- }
-
- public int getSecretServicePort() {
- return secretServicePort;
- }
-
- public ConnectorParams setSecretServicePort(int secretServicePort) {
- this.secretServicePort = secretServicePort;
- return this;
- }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
deleted file mode 100644
index 2cf56af..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-
-public class HttpDownloadRequest {
-
- private ConnectorParams connectorParams;
- private Connector srcConnector;
- private MetadataCollector srcMetadataCollector;
- private String srcResourceId;
- private String srcToken;
-
- public ConnectorParams getConnectorParams() {
- return connectorParams;
- }
-
- public HttpDownloadRequest setConnectorParams(ConnectorParams connectorParams) {
- this.connectorParams = connectorParams;
- return this;
- }
-
- public Connector getSrcConnector() {
- return srcConnector;
- }
-
- public HttpDownloadRequest setSrcConnector(Connector srcConnector) {
- this.srcConnector = srcConnector;
- return this;
- }
-
- public MetadataCollector getSrcMetadataCollector() {
- return srcMetadataCollector;
- }
-
- public HttpDownloadRequest setSrcMetadataCollector(MetadataCollector srcMetadataCollector) {
- this.srcMetadataCollector = srcMetadataCollector;
- return this;
- }
-
- public String getSrcResourceId() {
- return srcResourceId;
- }
-
- public HttpDownloadRequest setSrcResourceId(String srcResourceId) {
- this.srcResourceId = srcResourceId;
- return this;
- }
-
- public String getSrcToken() {
- return srcToken;
- }
-
- public HttpDownloadRequest setSrcToken(String srcToken) {
- this.srcToken = srcToken;
- return this;
- }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
index 07c0191..655eb68 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -22,17 +22,11 @@
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.CharsetUtil;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.*;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.activation.MimetypesFileTypeMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.io.InputStream;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
@@ -43,7 +37,6 @@
private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
private final HttpTransferRequestsStore transferRequestsStore;
- private final ExecutorService executor = Executors.newFixedThreadPool(10);
public HttpServerHandler(HttpTransferRequestsStore transferRequestsStore) {
this.transferRequestsStore = transferRequestsStore;
@@ -51,111 +44,89 @@
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
- if (!request.decoderResult().isSuccess()) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
- if (request.method() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
+ try {
+ if (!request.decoderResult().isSuccess()) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
- final String uri = request.uri().substring(1);
- logger.info("Received download request through url {}", uri);
+ if (request.method() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
- HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
+ final String uri = request.uri().substring(request.uri().lastIndexOf("/") + 1);
+ logger.info("Received download request through url {}", uri);
- if (httpTransferRequest == null) {
- logger.error("Couldn't find transfer request for uri {}", uri);
- sendError(ctx, NOT_FOUND);
- return;
- }
+ AgentHttpDownloadData downloadData = transferRequestsStore.getDownloadRequest(uri);
- Connector connector = httpTransferRequest.getOtherConnector();
- MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
+ if (downloadData == null) {
+ logger.error("Couldn't find transfer request for uri {}", uri);
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
- ConnectorParams params = httpTransferRequest.getConnectorParams();
+ long fileLength = downloadData.getConnectorConfig().getMetadata().getResourceSize();
- // TODO Load from HTTP Headers
- AuthToken authToken = httpTransferRequest.getAuthToken();
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ HttpUtil.setContentLength(response, fileLength);
+ setContentTypeHeader(response, downloadData.getConnectorConfig().getMetadata().getFriendlyName());
- connector.init(params.getResourceServiceHost(),
- params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
+ if (HttpUtil.isKeepAlive(request)) {
+ response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
+ }
- metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
- params.getSecretServiceHost(), params.getSecretServicePort());
+ // Write the initial line and the header.
+ ctx.write(response);
- Boolean available = metadataCollector.isAvailable(authToken,
- httpTransferRequest.getResourceId(), httpTransferRequest.getCredentialToken());
+ // Write the content.
+ ChannelFuture sendFileFuture;
+ ChannelFuture lastContentFuture;
+ // TODO: Support chunked streaming
+ if (downloadData.getIncomingStreamingConnector() == null && downloadData.getIncomingChunkedConnector() != null) {
+ logger.error("Chunked data download is not yes supported in Http transport");
+ throw new Exception("Chunked data download is not yes supported in Http transport");
+ }
- if (!available) {
- logger.error("File resource {} is not available", httpTransferRequest.getResourceId());
- sendError(ctx, NOT_FOUND);
- return;
- }
+ IncomingStreamingConnector incomingStreamingConnector = downloadData.getIncomingStreamingConnector();
+ incomingStreamingConnector.init(downloadData.getConnectorConfig());
+ InputStream inputStream = downloadData.getChildResourcePath().equals("")?
+ incomingStreamingConnector.fetchInputStream() :
+ incomingStreamingConnector.fetchInputStream(downloadData.getChildResourcePath());
- FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authToken,
- httpTransferRequest.getResourceId(),
- httpTransferRequest.getChildResourcePath(),
- httpTransferRequest.getCredentialToken());
+ sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(inputStream)),
+ ctx.newProgressivePromise());
- long fileLength = fileResourceMetadata.getResourceSize();
+ // HttpChunkedInput will write the end marker (LastHttpContent) for us.
+ lastContentFuture = sendFileFuture;
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- HttpUtil.setContentLength(response, fileLength);
- setContentTypeHeader(response, httpTransferRequest.getResourceId());
-
- if (HttpUtil.isKeepAlive(request)) {
- response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
- }
-
- // Write the initial line and the header.
- ctx.write(response);
-
- // Write the content.
- ChannelFuture sendFileFuture;
- ChannelFuture lastContentFuture;
-
- ConnectorContext connectorContext = new ConnectorContext();
- connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
- connectorContext.setTransferId(uri);
- connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
-
- TransferTask pullTask = new TransferTask(authToken, httpTransferRequest.getResourceId(),
- httpTransferRequest.getChildResourcePath(), httpTransferRequest.getCredentialToken(),
- connectorContext, connector);
-
- // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
- Future<Integer> pullStatusFuture = executor.submit(pullTask);
-
- sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
- ctx.newProgressivePromise());
-
- // HttpChunkedInput will write the end marker (LastHttpContent) for us.
- lastContentFuture = sendFileFuture;
-
- sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
- @Override
- public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
- if (total < 0) { // total unknown
- logger.error(future.channel() + " Transfer progress: " + progress);
- } else {
- logger.error(future.channel() + " Transfer progress: " + progress + " / " + total);
+ sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
+ @Override
+ public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
+ if (total < 0) { // total unknown
+ logger.debug(future.channel() + " Transfer progress: " + progress);
+ } else {
+ logger.debug(future.channel() + " Transfer progress: " + progress + " / " + total);
+ }
}
+
+ @Override
+ public void operationComplete(ChannelProgressiveFuture future) {
+ logger.info(future.channel() + " Transfer complete.");
+ }
+ });
+
+ // Decide whether to close the connection or not.
+ if (!HttpUtil.isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
- @Override
- public void operationComplete(ChannelProgressiveFuture future) {
- System.err.println(future.channel() + " Transfer complete.");
- }
- });
-
- // Decide whether to close the connection or not.
- if (!HttpUtil.isKeepAlive(request)) {
- // Close the connection when the whole content is written out.
- lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ } catch (Exception e) {
+ logger.error("Errored while processing HTTP download request", e);
+ sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
@@ -185,7 +156,7 @@
}
private static void setContentTypeHeader(HttpResponse response, String path) {
- MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
- response.headers().set(HttpHeaderNames.CONTENT_TYPE, path);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
+ response.headers().set(HttpHeaderNames.CONTENT_DISPOSITION, "attachment; filename=\"" + path+ "\"");
}
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
deleted file mode 100644
index c6d5192..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-
-public class HttpTransferRequest {
- private Connector otherConnector;
- private MetadataCollector otherMetadataCollector;
- private ConnectorParams connectorParams;
- private String resourceId;
- private String childResourcePath;
- private String credentialToken;
- private long createdTime = System.currentTimeMillis();
- private AuthToken authToken;
-
- public Connector getOtherConnector() {
- return otherConnector;
- }
-
- public HttpTransferRequest setOtherConnector(Connector otherConnector) {
- this.otherConnector = otherConnector;
- return this;
- }
-
- public MetadataCollector getOtherMetadataCollector() {
- return otherMetadataCollector;
- }
-
- public HttpTransferRequest setOtherMetadataCollector(MetadataCollector otherMetadataCollector) {
- this.otherMetadataCollector = otherMetadataCollector;
- return this;
- }
-
- public String getResourceId() {
- return resourceId;
- }
-
- public HttpTransferRequest setResourceId(String resourceId) {
- this.resourceId = resourceId;
- return this;
- }
-
- public String getChildResourcePath() {
- return childResourcePath;
- }
-
- public HttpTransferRequest setChildResourcePath(String childResourcePath) {
- this.childResourcePath = childResourcePath;
- return this;
- }
-
- public String getCredentialToken() {
- return credentialToken;
- }
-
- public HttpTransferRequest setCredentialToken(String credentialToken) {
- this.credentialToken = credentialToken;
- return this;
- }
-
- public ConnectorParams getConnectorParams() {
- return connectorParams;
- }
-
- public HttpTransferRequest setConnectorParams(ConnectorParams connectorParams) {
- this.connectorParams = connectorParams;
- return this;
- }
-
- public long getCreatedTime() {
- return createdTime;
- }
-
- public HttpTransferRequest setCreatedTime(long createdTime) {
- this.createdTime = createdTime;
- return this;
- }
-
- public AuthToken getAuthToken() {
- return authToken;
- }
-
- public HttpTransferRequest setAuthToken(AuthToken authToken) {
- this.authToken = authToken;
- return this;
- }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
index dd2f17e..56c468b 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
@@ -31,8 +31,8 @@
private static final Logger logger = LoggerFactory.getLogger(HttpTransferRequestsStore.class);
- final private Map<String, HttpTransferRequest> downloadRequestStore = new ConcurrentHashMap<>();
- final private Map<String, HttpTransferRequest> uploadRequestStore = new ConcurrentHashMap<>();
+ final private Map<String, AgentHttpDownloadData> downloadRequestStore = new ConcurrentHashMap<>();
+ final private Map<String, AgentHttpDownloadData> uploadRequestStore = new ConcurrentHashMap<>();
final private ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
private long entryExpiryTimeMS = 300 * 1000;
@@ -56,32 +56,32 @@
}, 2, 10, TimeUnit.SECONDS);
}
- public String addDownloadRequest(HttpTransferRequest request) {
+ public String addDownloadRequest(AgentHttpDownloadData request) {
String randomUrl = UUID.randomUUID().toString();
downloadRequestStore.put(randomUrl, request);
return randomUrl;
}
- public HttpTransferRequest getDownloadRequest(String url) {
+ public AgentHttpDownloadData getDownloadRequest(String url) {
//TODO Need to block concurrent calls to same url as connectors are not thread safe
- HttpTransferRequest request = downloadRequestStore.get(url);
+ AgentHttpDownloadData request = downloadRequestStore.get(url);
if (request != null) {
downloadRequestStore.remove(url);
}
return request;
}
- public String addUploadRequest(HttpTransferRequest request) {
+ public String addUploadRequest(AgentHttpDownloadData request) {
String randomUrl = UUID.randomUUID().toString();
uploadRequestStore.put(randomUrl, request);
return randomUrl;
}
- public HttpTransferRequest getUploadRequest(String url) {
+ public AgentHttpDownloadData getUploadRequest(String url) {
//TODO Need to block concurrent calls to same url as connectors are not thread safe
- HttpTransferRequest request = uploadRequestStore.get(url);
+ AgentHttpDownloadData request = uploadRequestStore.get(url);
if (request != null) {
uploadRequestStore.remove(url);
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
index 7cf638b..03466d4 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
@@ -21,15 +21,16 @@
import com.google.protobuf.util.JsonFormat;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
-import org.apache.airavata.mft.agent.http.ConnectorParams;
-import org.apache.airavata.mft.agent.http.HttpTransferRequest;
+import org.apache.airavata.mft.agent.http.AgentHttpDownloadData;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.DirectoryResourceMetadata;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,14 +54,8 @@
@org.springframework.beans.factory.annotation.Value("${secret.service.port}")
private int secretServicePort;
- @org.springframework.beans.factory.annotation.Value("${agent.advertised.host}")
- private String agentAdvertisedHost;
-
- @org.springframework.beans.factory.annotation.Value("${agent.http.port}")
- private Integer agentHttpPort;
-
- @org.springframework.beans.factory.annotation.Value("${agent.https.enabled}")
- private boolean agentHttpsEnabled;
+ @org.springframework.beans.factory.annotation.Value("${agent.advertised.url}")
+ private String agentAdvertisedUrl;
@Autowired
private HttpTransferRequestsStore httpTransferRequestsStore;
@@ -68,6 +63,7 @@
public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
// TODO implement using the reflection
ObjectMapper mapper = new ObjectMapper();
+ logger.info("Accepting sync request {} for method {}", request.getRequestId(), request.getMethod());
switch (request.getMethod()) {
case "getFileResourceMetadata":
@@ -159,25 +155,44 @@
mftAuthorizationToken = tokenBuilder.build();
metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
- Optional<Connector> connectorOp = ConnectorResolver.resolveConnector(storeType, "IN");
+ Optional<IncomingStreamingConnector> connectorStreamingOp = ConnectorResolver.resolveIncomingStreamingConnector(storeType);
+ Optional<IncomingChunkedConnector> connectorChunkedOp = ConnectorResolver.resolveIncomingChunkedConnector(storeType);
- if (metadataCollectorOp.isPresent() && connectorOp.isPresent()) {
- HttpTransferRequest transferRequest = new HttpTransferRequest()
- .setConnectorParams(new ConnectorParams()
- .setResourceServiceHost(resourceServiceHost)
- .setResourceServicePort(resourceServicePort)
- .setSecretServiceHost(secretServiceHost)
- .setSecretServicePort(secretServicePort))
- .setResourceId(resourceId)
- .setChildResourcePath(childResourcePath)
- .setCredentialToken(sourceToken)
- .setOtherMetadataCollector(metadataCollectorOp.get())
- .setOtherConnector(connectorOp.get())
- .setAuthToken(mftAuthorizationToken);
- String url = httpTransferRequestsStore.addDownloadRequest(transferRequest);
- return (agentHttpsEnabled? "https": "http") + "://" + agentAdvertisedHost + ":" + agentHttpPort + "/" + url;
+ if (metadataCollectorOp.isPresent() && (connectorStreamingOp.isPresent() || connectorChunkedOp.isPresent())) {
+
+ MetadataCollector metadataCollector = metadataCollectorOp.get();
+ metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+
+ FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(
+ mftAuthorizationToken,
+ resourceId,
+ childResourcePath,
+ sourceToken);
+
+ AgentHttpDownloadData.AgentHttpDownloadDataBuilder agentHttpDownloadDataBuilder = AgentHttpDownloadData.AgentHttpDownloadDataBuilder.newBuilder()
+ .withChildResourcePath(childResourcePath)
+ .withConnectorConfig(ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+ .withResourceServiceHost(resourceServiceHost)
+ .withResourceServicePort(resourceServicePort)
+ .withSecretServiceHost(secretServiceHost)
+ .withSecretServicePort(secretServicePort)
+ .withResourceId(resourceId)
+ .withCredentialToken(sourceToken)
+ .withAuthToken(mftAuthorizationToken)
+ .withMetadata(fileResourceMetadata).build());
+
+ connectorStreamingOp.ifPresent(agentHttpDownloadDataBuilder::withIncomingStreamingConnector);
+ connectorChunkedOp.ifPresent(agentHttpDownloadDataBuilder::withIncomingChunkedConnector);
+
+ AgentHttpDownloadData downloadData = agentHttpDownloadDataBuilder.build();
+
+ String url = httpTransferRequestsStore.addDownloadRequest(downloadData);
+
+ return (agentAdvertisedUrl.endsWith("/")? agentAdvertisedUrl : agentAdvertisedUrl + "/") + url;
+ } else {
+ logger.error("Medata collector or connector is not available for store type {}", storeType);
+ throw new Exception("Medata collector or connector is not available for store type " + storeType);
}
- break;
}
logger.error("Unknown method type specified {}", request.getMethod());
throw new Exception("Unknown method " + request.getMethod());
diff --git a/agent/src/main/resources/application.properties b/agent/src/main/resources/application.properties
index d5949da..4d8d87c 100644
--- a/agent/src/main/resources/application.properties
+++ b/agent/src/main/resources/application.properties
@@ -31,4 +31,5 @@
resource.service.host=localhost
resource.service.port=7002
secret.service.host=localhost
-secret.service.port=7003
\ No newline at end of file
+secret.service.port=7003
+agent.advertised.url=http://localhost:3333
\ No newline at end of file
diff --git a/agent/src/main/resources/distribution/conf/application.properties b/agent/src/main/resources/distribution/conf/application.properties
index d5949da..1b00a6c 100644
--- a/agent/src/main/resources/distribution/conf/application.properties
+++ b/agent/src/main/resources/distribution/conf/application.properties
@@ -19,7 +19,7 @@
agent.id=agent0
agent.secret=CHANGE_ME
agent.host=localhost
-agent.advertised.host=localhost
+agent.advertised.url=http://localhost
agent.user=dimuthu
agent.http.port=3333
agent.https.enabled=false
diff --git a/api/client/src/main/java/org/apache/airavata/mft/api/client/MFTApiClient.java b/api/client/src/main/java/org/apache/airavata/mft/api/client/MFTApiClient.java
index 82bfcbb..d103849 100644
--- a/api/client/src/main/java/org/apache/airavata/mft/api/client/MFTApiClient.java
+++ b/api/client/src/main/java/org/apache/airavata/mft/api/client/MFTApiClient.java
@@ -21,24 +21,28 @@
import io.grpc.ManagedChannelBuilder;
import org.apache.airavata.mft.api.service.*;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class MFTApiClient {
- private static Map<String, Map<Integer, MFTApiServiceGrpc.MFTApiServiceBlockingStub>> stubCache = new ConcurrentHashMap<>();
+public class MFTApiClient implements Closeable {
- public static MFTApiServiceGrpc.MFTApiServiceBlockingStub buildClient(String hostName, int port) {
+ private final ManagedChannel channel;
- if (stubCache.containsKey(hostName)) {
- if (stubCache.get(hostName).containsKey(port)) {
- return stubCache.get(hostName).get(port);
- }
+ public MFTApiClient(String hostName, int port) {
+ channel = ManagedChannelBuilder.forAddress(hostName, port).usePlaintext().build();
+ }
+
+ public MFTApiServiceGrpc.MFTApiServiceBlockingStub get() {
+ return MFTApiServiceGrpc.newBlockingStub(channel);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (channel != null) {
+ channel.shutdown();
}
-
- ManagedChannel channel = ManagedChannelBuilder.forAddress(hostName, port).usePlaintext().build();
- MFTApiServiceGrpc.MFTApiServiceBlockingStub stub = MFTApiServiceGrpc.newBlockingStub(channel);
- stubCache.put(hostName, Collections.singletonMap(port, stub));
- return stub;
}
}
diff --git a/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java b/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java
index ca63e1c..808c913 100644
--- a/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java
+++ b/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java
@@ -8,7 +8,7 @@
public class Example {
public static void main(String a[]) {
- MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient("localhost", 7004);
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = new MFTApiClient("localhost", 7004).get();
mftClient.getResourceAvailability(ResourceAvailabilityRequest.newBuilder()
.setResourceId("a")
.setResourceToken("b")
diff --git a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index 29f144b..dbce2d2 100644
--- a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -201,8 +201,13 @@
public void getFileResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<FileMetadataResponse> responseObserver) {
try {
+
+ logger.info("Calling get file resource metadata for resource {}", request.getResourceId());
+
+ String targetAgent = derriveTargetAgent(request.getTargetAgentId());
+
SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
- .withAgentId(request.getTargetAgentId())
+ .withAgentId(targetAgent)
.withMessageId(UUID.randomUUID().toString())
.withParameter("resourceId", request.getResourceId())
.withParameter("resourceType", request.getResourceType())
@@ -249,8 +254,12 @@
@Override
public void getDirectoryResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<DirectoryMetadataResponse> responseObserver) {
try {
+
+ logger.info("Calling get directory metadata for resource {}", request.getResourceId());
+ String targetAgent = derriveTargetAgent(request.getTargetAgentId());
+
SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
- .withAgentId(request.getTargetAgentId())
+ .withAgentId(targetAgent)
.withMessageId(UUID.randomUUID().toString())
.withParameter("resourceId", request.getResourceId())
.withParameter("resourceType", request.getResourceType())
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
index 469d19e..7a76bcd 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
@@ -17,103 +17,84 @@
package org.apache.airavata.mft.core;
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
+import org.apache.airavata.mft.core.api.OutgoingStreamingConnector;
-import java.lang.reflect.InvocationTargetException;
import java.util.Optional;
public final class ConnectorResolver {
- public static Optional<Connector> resolveConnector(String type, String direction) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+
+ public static Optional<IncomingStreamingConnector> resolveIncomingStreamingConnector(String type) throws Exception {
String className = null;
switch (type) {
case "SCP":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.scp.SCPReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.scp.SCPSender";
- break;
- }
- break;
- case "LOCAL":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.local.LocalReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.local.LocalSender";
- break;
- }
- break;
- case "S3":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.s3.S3Receiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.s3.S3Sender";
- break;
- }
- break;
- case "BOX":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.box.BoxReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.box.BoxSender";
- break;
- }
- break;
- case "AZURE":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.azure.AzureReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.azure.AzureSender";
- break;
- }
- break;
- case "GCS":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.gcp.GCSReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.gcp.GCSSender";
- break;
- }
- break;
- case "DROPBOX":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.dropbox.DropboxReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.dropbox.DropboxSender";
- break;
- }
- break;
- case "FTP":
- switch (direction) {
- case "IN":
- className = "org.apache.airavata.mft.transport.ftp.FTPReceiver";
- break;
- case "OUT":
- className = "org.apache.airavata.mft.transport.ftp.FTPSender";
- break;
- }
+ className = "org.apache.airavata.mft.transport.scp.SCPIncomingConnector";
break;
}
if (className != null) {
Class<?> aClass = Class.forName(className);
- return Optional.of((Connector) aClass.getDeclaredConstructor().newInstance());
+ return Optional.of((IncomingStreamingConnector) aClass.getDeclaredConstructor().newInstance());
} else {
return Optional.empty();
}
}
+
+ public static Optional<OutgoingStreamingConnector> resolveOutgoingStreamingConnector(String type) throws Exception {
+
+ String className = null;
+ switch (type) {
+ case "SCP":
+ className = "org.apache.airavata.mft.transport.scp.SCPOutgoingConnector";
+ break;
+ case "S3":
+ className = "org.apache.airavata.mft.transport.s3.S3IncomingConnector";
+ break;
+ }
+
+ if (className != null) {
+ Class<?> aClass = Class.forName(className);
+ return Optional.of((OutgoingStreamingConnector) aClass.getDeclaredConstructor().newInstance());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ public static Optional<IncomingChunkedConnector> resolveIncomingChunkedConnector(String type) throws Exception {
+
+ String className = null;
+ switch (type) {
+ case "S3":
+ className = "org.apache.airavata.mft.transport.s3.S3IncomingConnector";
+ break;
+ }
+
+ if (className != null) {
+ Class<?> aClass = Class.forName(className);
+ return Optional.of((IncomingChunkedConnector) aClass.getDeclaredConstructor().newInstance());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ public static Optional<OutgoingChunkedConnector> resolveOutgoingChunkedConnector(String type) throws Exception {
+
+ String className = null;
+ switch (type) {
+ case "S3":
+ className = "org.apache.airavata.mft.transport.s3.S3OutgoingConnector";
+ break;
+ }
+
+ if (className != null) {
+ Class<?> aClass = Class.forName(className);
+ return Optional.of((OutgoingChunkedConnector) aClass.getDeclaredConstructor().newInstance());
+ } else {
+ return Optional.empty();
+ }
+ }
+
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
index d61d743..7033d02 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
@@ -91,7 +91,7 @@
private Builder() {
}
- public static Builder getBuilder() {
+ public static Builder newBuilder() {
return new Builder();
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java b/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
deleted file mode 100644
index 42a0e08..0000000
--- a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.core;
-
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.api.Connector;
-
-import java.util.concurrent.Callable;
-
-public class TransferTask implements Callable<Integer> {
-
- private Connector connector;
- private ConnectorContext context;
- private String resourceId;
- private String childResourcePath;
- private String credentialToken;
- private AuthToken authToken;
-
- public TransferTask(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
- ConnectorContext context, Connector connector) {
- this.connector = connector;
- this.context = context;
- this.resourceId = resourceId;
- this.authToken = authToken;
- this.credentialToken = credentialToken;
- this.childResourcePath = childResourcePath;
- }
-
- @Override
- public Integer call() throws Exception {
- if (childResourcePath == null || "".equals(childResourcePath)) {
- this.connector.startStream(authToken, resourceId, credentialToken, context);
- } else {
- this.connector.startStream(authToken, resourceId, childResourcePath, credentialToken, context);
- }
- return 0;
- }
-}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
new file mode 100644
index 0000000..1ca72d6
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
@@ -0,0 +1,6 @@
+package org.apache.airavata.mft.core.api;
+
+public interface BasicConnector {
+ public void init(ConnectorConfig connectorConfig) throws Exception;
+ public void complete() throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
new file mode 100644
index 0000000..bc754e8
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
@@ -0,0 +1,167 @@
+package org.apache.airavata.mft.core.api;
+
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.core.FileResourceMetadata;
+
+public class ConnectorConfig {
+ private String resourceServiceHost;
+ private int resourceServicePort;
+ private String secretServiceHost;
+ private int secretServicePort;
+ private String resourceId;
+ private String credentialToken;
+ private AuthToken authToken;
+ private String transferId;
+ private FileResourceMetadata metadata;
+
+ public String getResourceServiceHost() {
+ return resourceServiceHost;
+ }
+
+ public void setResourceServiceHost(String resourceServiceHost) {
+ this.resourceServiceHost = resourceServiceHost;
+ }
+
+ public int getResourceServicePort() {
+ return resourceServicePort;
+ }
+
+ public void setResourceServicePort(int resourceServicePort) {
+ this.resourceServicePort = resourceServicePort;
+ }
+
+ public String getSecretServiceHost() {
+ return secretServiceHost;
+ }
+
+ public void setSecretServiceHost(String secretServiceHost) {
+ this.secretServiceHost = secretServiceHost;
+ }
+
+ public int getSecretServicePort() {
+ return secretServicePort;
+ }
+
+ public void setSecretServicePort(int secretServicePort) {
+ this.secretServicePort = secretServicePort;
+ }
+
+ public String getResourceId() {
+ return resourceId;
+ }
+
+ public void setResourceId(String resourceId) {
+ this.resourceId = resourceId;
+ }
+
+ public String getCredentialToken() {
+ return credentialToken;
+ }
+
+ public void setCredentialToken(String credentialToken) {
+ this.credentialToken = credentialToken;
+ }
+
+ public AuthToken getAuthToken() {
+ return authToken;
+ }
+
+ public void setAuthToken(AuthToken authToken) {
+ this.authToken = authToken;
+ }
+
+ public String getTransferId() {
+ return transferId;
+ }
+
+ public void setTransferId(String transferId) {
+ this.transferId = transferId;
+ }
+
+ public FileResourceMetadata getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(FileResourceMetadata metadata) {
+ this.metadata = metadata;
+ }
+
+
+ public static final class ConnectorConfigBuilder {
+ private String resourceServiceHost;
+ private int resourceServicePort;
+ private String secretServiceHost;
+ private int secretServicePort;
+ private String resourceId;
+ private String credentialToken;
+ private AuthToken authToken;
+ private String transferId;
+ private FileResourceMetadata metadata;
+
+ private ConnectorConfigBuilder() {
+ }
+
+ public static ConnectorConfigBuilder newBuilder() {
+ return new ConnectorConfigBuilder();
+ }
+
+ public ConnectorConfigBuilder withResourceServiceHost(String resourceServiceHost) {
+ this.resourceServiceHost = resourceServiceHost;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withResourceServicePort(int resourceServicePort) {
+ this.resourceServicePort = resourceServicePort;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withSecretServiceHost(String secretServiceHost) {
+ this.secretServiceHost = secretServiceHost;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withSecretServicePort(int secretServicePort) {
+ this.secretServicePort = secretServicePort;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withResourceId(String resourceId) {
+ this.resourceId = resourceId;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withCredentialToken(String credentialToken) {
+ this.credentialToken = credentialToken;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withAuthToken(AuthToken authToken) {
+ this.authToken = authToken;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withTransferId(String transferId) {
+ this.transferId = transferId;
+ return this;
+ }
+
+ public ConnectorConfigBuilder withMetadata(FileResourceMetadata metadata) {
+ this.metadata = metadata;
+ return this;
+ }
+
+ public ConnectorConfig build() {
+ ConnectorConfig connectorConfig = new ConnectorConfig();
+ connectorConfig.setResourceServiceHost(resourceServiceHost);
+ connectorConfig.setResourceServicePort(resourceServicePort);
+ connectorConfig.setSecretServiceHost(secretServiceHost);
+ connectorConfig.setSecretServicePort(secretServicePort);
+ connectorConfig.setResourceId(resourceId);
+ connectorConfig.setCredentialToken(credentialToken);
+ connectorConfig.setAuthToken(authToken);
+ connectorConfig.setTransferId(transferId);
+ connectorConfig.setMetadata(metadata);
+ return connectorConfig;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
new file mode 100644
index 0000000..37288c6
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
@@ -0,0 +1,5 @@
+package org.apache.airavata.mft.core.api;
+
+public interface IncomingChunkedConnector extends BasicConnector {
+ public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
new file mode 100644
index 0000000..900b148
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core.api;
+
+import java.io.InputStream;
+
+public interface IncomingStreamingConnector {
+ public void init(ConnectorConfig connectorConfig) throws Exception;
+ public InputStream fetchInputStream() throws Exception;
+ public InputStream fetchInputStream(String childPath) throws Exception;
+ public void complete() throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
new file mode 100644
index 0000000..9a2f3cb
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
@@ -0,0 +1,5 @@
+package org.apache.airavata.mft.core.api;
+
+public interface OutgoingChunkedConnector extends BasicConnector {
+ public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
new file mode 100644
index 0000000..1e2633d
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core.api;
+
+import java.io.OutputStream;
+
+public interface OutgoingStreamingConnector extends BasicConnector {
+ public OutputStream fetchOutputStream() throws Exception;
+ public OutputStream fetchOutputStream(String childPath) throws Exception;
+}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
index 4e95295..31f8f02 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
@@ -28,7 +28,7 @@
public static void main(String args[]) {
AuthToken mftAuthorizationToken = AuthToken.newBuilder().setUserTokenAuth(UserTokenAuth.newBuilder().setToken("43ff79ac-e4f2-473c-9ea1-04eee9509a53").build()).build();
- MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
HttpDownloadApiResponse httpDownloadApiResponse = client.submitHttpDownload(HttpDownloadApiRequest.newBuilder()
.setTargetAgent("agent0")
.setSourceResourceId("remote-ssh-resource")
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java
index 3cf2ab0..0a95ac1 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java
@@ -8,7 +8,7 @@
public class SCPExample {
public static void main(String args[]) throws Exception {
- MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
// File metadata
long startTime = System.currentTimeMillis();
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
index 2869a38..096cba9 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
@@ -26,7 +26,7 @@
public class LocalExample {
public static void main(String args[]) throws Exception {
- MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
String sourceResourceId = "remote-ssh-resource";
String sourceToken = "local-ssh-cred";
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
index eeb48f9..6894b7e 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
@@ -26,7 +26,7 @@
public class S3Example {
public static void main(String args[]) throws Exception {
- MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
String sourceResourceId = "remote-ssh-storage";
String sourceResourcePath = "/tmp/1mb.txt";
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
index 6e740b9..c48119b 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
@@ -26,7 +26,7 @@
public class SCPExample {
public static void main(String args[]) throws Exception {
- MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
String sourceResourceId = "remote-ssh-resource-1";
String sourceResourcePath = "/tmp/1mb.txt";
diff --git a/pom.xml b/pom.xml
index 57ce5a3..567b6be 100755
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
<jsch>0.1.55</jsch>
<sshj>0.27.0</sshj>
<mariadb.jdbc>2.5.1</mariadb.jdbc>
- <custos.clients.version>1.0-SNAPSHOT</custos.clients.version>
+ <custos.clients.version>1.1-SNAPSHOT</custos.clients.version>
</properties>
</project>
diff --git a/scripts/build.sh b/scripts/build.sh
index 5bb68ab..2d5e4af 100755
--- a/scripts/build.sh
+++ b/scripts/build.sh
@@ -19,6 +19,7 @@
cd ../
mvn clean install
+rm -rf build
mkdir -p build
cp agent/target/MFT-Agent-0.01-bin.zip build/
cp controller/target/MFT-Controller-0.01-bin.zip build/
@@ -30,4 +31,4 @@
unzip -o build/MFT-Controller-0.01-bin.zip -d build/
unzip -o build/Resource-Service-0.01-bin.zip -d build/
unzip -o build/Secret-Service-0.01-bin.zip -d build/
-unzip -o build/API-Service-0.01-bin.zip -d build/
\ No newline at end of file
+unzip -o build/API-Service-0.01-bin.zip -d build/
diff --git a/services/resource-service/server/pom.xml b/services/resource-service/server/pom.xml
index 3b8712e..1e3e9d0 100644
--- a/services/resource-service/server/pom.xml
+++ b/services/resource-service/server/pom.xml
@@ -68,6 +68,12 @@
<groupId>org.apache.airavata.data.lake</groupId>
<artifactId>drms-stubs</artifactId>
<version>0.01-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java
index bd00053..931f6db 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java
@@ -19,12 +19,16 @@
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.AuthCredentialType;
+import org.apache.airavata.datalake.drms.AuthenticatedUser;
import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
import org.apache.airavata.datalake.drms.storage.ResourceFetchRequest;
import org.apache.airavata.datalake.drms.storage.ResourceFetchResponse;
import org.apache.airavata.datalake.drms.storage.ResourceServiceGrpc;
import org.apache.airavata.datalake.drms.storage.ssh.SSHStorage;
import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.apache.airavata.mft.common.PasswordAuth;
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.stubs.azure.storage.*;
import org.apache.airavata.mft.resource.stubs.box.storage.*;
@@ -38,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Optional;
public class DatalakeResourceBackend implements ResourceBackend {
@@ -72,14 +78,44 @@
}
}
+ private DRMSServiceAuthToken getDrmsToken(AuthToken authToken) {
+ switch (authToken.getAuthMechanismCase()) {
+ case USERTOKENAUTH:
+ return DRMSServiceAuthToken.newBuilder().setAccessToken(authToken.getUserTokenAuth().getToken()).build();
+
+ case DELEGATEAUTH:
+ DelegateAuth delegateAuth = authToken.getDelegateAuth();
+ return DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(Base64.getEncoder()
+ .encodeToString((delegateAuth.getClientId() + ":" + delegateAuth.getClientSecret())
+ .getBytes(StandardCharsets.UTF_8)))
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(delegateAuth.getUserId())
+ .setTenantId(delegateAuth.getPropertiesOrThrow("TENANT_ID"))
+ .build())
+ .build();
+
+ }
+ return null;
+
+ }
@Override
public Optional<GenericResource> getGenericResource(GenericResourceGetRequest request) throws Exception {
AuthToken authzToken = request.getAuthzToken();
+ DRMSServiceAuthToken drmsServiceAuthToken = getDrmsToken(authzToken);
+
+ if (drmsServiceAuthToken == null) {
+ logger.error("DRMS Service auth token can not be null. Invalid token type {} specified",
+ authzToken.getAuthMechanismCase());
+ throw new Exception("DRMS Service auth token can not be null. Invalid token type specified");
+ }
+
ResourceServiceGrpc.ResourceServiceBlockingStub datalakeResourceStub = ResourceServiceGrpc.newBlockingStub(channel);
ResourceFetchResponse resourceFetchResponse = datalakeResourceStub.fetchResource(ResourceFetchRequest.newBuilder()
- .setAuthToken(DRMSServiceAuthToken.newBuilder().setAccessToken(authzToken.getUserTokenAuth().getToken()).build())
+ .setAuthToken(drmsServiceAuthToken)
.setResourceId(request.getResourceId())
.build());
@@ -105,10 +141,9 @@
switch (resource.getStorageCase()) {
case SSH_STORAGE:
SSHStorage storage = resource.getSshStorage();
-// resourceBuilder.setScpStorage(SCPStorage.newBuilder()
-// .setStorageId(storage.getStorageId()).setHost(storage.getHostName())
-// .setPort(storage.getPort())
-// .setUser(sshPreference.getUserName()).build());
+ resourceBuilder.setScpStorage(SCPStorage.newBuilder()
+ .setStorageId(storage.getStorageId()).setHost(storage.getHostName())
+ .setPort(storage.getPort()).build());
break;
case S3_STORAGE:
org.apache.airavata.datalake.drms.storage.s3.S3Storage s3Storage = resource.getS3Storage();
@@ -299,6 +334,4 @@
public boolean deleteFTPStorage(FTPStorageDeleteRequest request) throws Exception {
return false;
}
-
-
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GenericResourceServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GenericResourceServiceHandler.java
index 491cea2..5ed85e4 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GenericResourceServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GenericResourceServiceHandler.java
@@ -33,7 +33,7 @@
logger.error("Failed in retrieving generic resource with id {}", request.getResourceId(), e);
responseObserver.onError(Status.INTERNAL.withCause(e)
- .withDescription("Failed in retrieving GCS resource with id " + request.getResourceId())
+ .withDescription("Failed in retrieving Generic resource with id " + request.getResourceId())
.asRuntimeException());
}
}
diff --git a/services/resource-service/server/src/main/resources/applicationContext.xml b/services/resource-service/server/src/main/resources/applicationContext.xml
index 2fbd7eb..ea845a3 100644
--- a/services/resource-service/server/src/main/resources/applicationContext.xml
+++ b/services/resource-service/server/src/main/resources/applicationContext.xml
@@ -6,7 +6,7 @@
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
- <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.datalake.DatalakeResourceBackend"
+ <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.file.FileBasedResourceBackend"
init-method="init" destroy-method="destroy"></bean>
</beans>
\ No newline at end of file
diff --git a/services/secret-service/server/pom.xml b/services/secret-service/server/pom.xml
index 8293b24..bfcc650 100644
--- a/services/secret-service/server/pom.xml
+++ b/services/secret-service/server/pom.xml
@@ -78,6 +78,17 @@
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>drms-stubs</artifactId>
+ <version>0.01-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java
index 601f2d8..c71ef77 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java
@@ -1,5 +1,13 @@
package org.apache.airavata.mft.secret.server.backend.custos;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.AuthCredentialType;
+import org.apache.airavata.datalake.drms.AuthenticatedUser;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.storage.*;
+import org.apache.airavata.datalake.drms.storage.preference.ssh.SSHStoragePreference;
+import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.common.DelegateAuth;
import org.apache.airavata.mft.credential.stubs.azure.*;
import org.apache.airavata.mft.credential.stubs.box.*;
@@ -24,6 +32,8 @@
import org.springframework.beans.factory.annotation.Value;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Map;
import java.util.Optional;
@@ -45,6 +55,12 @@
@Value("${custos.secret}")
private String custosSecret;
+ @Value("${custos.backend.drms.host}")
+ private String drmsHost;
+
+ @Value("${custos.backend.drms.port}")
+ private int drmsPort;
+
private AgentAuthenticationHandler handler;
private CustosClientsFactory custosClientsFactory;
@@ -88,18 +104,80 @@
}
}
+ private AnyStoragePreference getStoragePreference(String storagePefId) {
+ return AnyStoragePreference.newBuilder().build();
+ }
+
+ private DRMSServiceAuthToken getDrmsToken(AuthToken authToken) {
+ switch (authToken.getAuthMechanismCase()) {
+ case USERTOKENAUTH:
+ return DRMSServiceAuthToken.newBuilder().setAccessToken(authToken.getUserTokenAuth().getToken()).build();
+
+ case DELEGATEAUTH:
+ DelegateAuth delegateAuth = authToken.getDelegateAuth();
+ return DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(Base64.getEncoder()
+ .encodeToString((delegateAuth.getClientId() + ":" + delegateAuth.getClientSecret())
+ .getBytes(StandardCharsets.UTF_8)))
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(delegateAuth.getUserId())
+ .setTenantId(delegateAuth.getPropertiesOrThrow("TENANT_ID"))
+ .build())
+ .build();
+ }
+ return null;
+ }
+
+
@Override
public Optional<SCPSecret> getSCPSecret(SCPSecretGetRequest request) throws Exception {
+
+ DRMSServiceAuthToken drmsToken = getDrmsToken(request.getAuthzToken());
+
+ if (drmsToken == null) {
+ LOGGER.error("DRMS Token can not be null");
+ return Optional.empty();
+ }
+
+ String storagePrefId = request.getSecretId();
+
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(drmsHost, drmsPort).usePlaintext().build();
+ AnyStoragePreference storagePreference;
+
+ try {
+ StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub spClient =
+ StoragePreferenceServiceGrpc.newBlockingStub(channel);
+
+ StoragePreferenceFetchResponse storagePreferenceResp = spClient.
+ fetchStoragePreference(StoragePreferenceFetchRequest.newBuilder().
+ setAuthToken(drmsToken).setStoragePreferenceId(storagePrefId).build());
+
+ storagePreference = storagePreferenceResp.getStoragePreference();
+ } finally {
+ channel.shutdown();
+ }
+
+ SSHStoragePreference sshStoragePreference;
+ if (storagePreference.getStorageCase() == AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE) {
+ sshStoragePreference = storagePreference.getSshStoragePreference();
+ } else {
+ LOGGER.error("Invalid storage case {} for preference {}", storagePreference.getStorageCase(), storagePrefId);
+ return Optional.empty();
+ }
+
switch (request.getAuthzToken().getAuthMechanismCase()) {
case AGENTAUTH:
String agentId = request.getAuthzToken().getAgentAuth().getAgentId();
String secret = request.getAuthzToken().getAgentAuth().getAgentSecret();
+
Optional<AuthConfig> optionalAuthConfig = handler.authenticate(agentId, secret);
if (optionalAuthConfig.isPresent()) {
AuthConfig authConfig = optionalAuthConfig.get();
SSHCredential sshCredential = csAgentClient.getSSHCredential(request.getAuthzToken().getAgentAuth().getToken(),
- authConfig.getAccessToken(), request.getSecretId(), false);
+ authConfig.getAccessToken(), sshStoragePreference.getCredentialToken(), false);
SCPSecret scpSecret = SCPSecret.newBuilder()
+ .setUser(sshStoragePreference.getUserName())
.setSecretId(sshCredential.getMetadata().getToken())
.setPublicKey(sshCredential.getPublicKey())
.setPassphrase(sshCredential.getPassphrase())
@@ -110,8 +188,9 @@
case USERTOKENAUTH:
if (identityClient.isAuthenticated(request.getAuthzToken().getUserTokenAuth().getToken())) {
//custosId need to be replaced with actual gateway custos Id
- SSHCredential sshCredential = csClient.getSSHCredential(custosId, request.getSecretId(), false);
+ SSHCredential sshCredential = csClient.getSSHCredential(custosId, sshStoragePreference.getCredentialToken(), false);
SCPSecret scpSecret = SCPSecret.newBuilder()
+ .setUser(sshStoragePreference.getUserName())
.setSecretId(sshCredential.getMetadata().getToken())
.setPublicKey(sshCredential.getPublicKey())
.setPassphrase(sshCredential.getPassphrase())
@@ -122,9 +201,10 @@
case DELEGATEAUTH:
DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
- SSHCredential sshCredential = csClient.getSSHCredential(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
- request.getSecretId(), false);
+ SSHCredential sshCredential = csClient.getSSHCredential(delegateAuth.getPropertiesMap().get("TENANT_ID"),
+ sshStoragePreference.getCredentialToken(), false);
SCPSecret scpSecret = SCPSecret.newBuilder()
+ .setUser(sshStoragePreference.getUserName())
.setSecretId(sshCredential.getMetadata().getToken())
.setPublicKey(sshCredential.getPublicKey())
.setPassphrase(sshCredential.getPassphrase())
@@ -183,7 +263,7 @@
case DELEGATEAUTH:
DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
- CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+ CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
request.getSecretId());
Map<String, String> secretValues = credentialMap.getCredentialMapMap();
S3Secret s3Secret = S3Secret.newBuilder()
@@ -242,7 +322,7 @@
case DELEGATEAUTH:
DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
- CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+ CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
request.getSecretId());
Map<String, String> secretValues = credentialMap.getCredentialMapMap();
BoxSecret boxSecret = BoxSecret.newBuilder()
@@ -305,7 +385,7 @@
case DELEGATEAUTH:
DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
- CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+ CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
request.getSecretId());
Map<String, String> secretValues = credentialMap.getCredentialMapMap();
AzureSecret azureSecret = AzureSecret.newBuilder()
@@ -367,7 +447,7 @@
case DELEGATEAUTH:
DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
- CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+ CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
request.getSecretId());
Map<String, String> secretValues = credentialMap.getCredentialMapMap();
GCSSecret gcsSecret = GCSSecret.newBuilder()
@@ -429,7 +509,7 @@
case DELEGATEAUTH:
DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
- CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+ CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
request.getSecretId());
Map<String, String> secretValues = credentialMap.getCredentialMapMap();
DropboxSecret dropboxSecret = DropboxSecret.newBuilder()
@@ -492,9 +572,10 @@
break;
case DELEGATEAUTH:
DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
+ // TODO validate delegate auth token
ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
PasswordCredential passwordCredential = csClient
- .getPasswordCredential(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+ .getPasswordCredential(delegateAuth.getPropertiesMap().get("TENANT_ID"),
request.getSecretId());
FTPSecret ftpSecret = FTPSecret.newBuilder()
.setSecretId(request.getSecretId())
@@ -523,10 +604,8 @@
private ResourceSecretManagementClient getTenantResourceSecretManagementClient(DelegateAuth delegateAuth) throws IOException {
- String adminCustosId = delegateAuth.getClientId();
- String adminCustosSecret = delegateAuth.getClientSecret();
CustosClientProvider custosClientProvider = custosClientsFactory
- .getCustosClientProvider(adminCustosId, adminCustosSecret);
+ .getCustosClientProvider(custosId, custosSecret);
return custosClientProvider
.getResourceSecretManagementClient();
}
diff --git a/transport/azure-transport/pom.xml b/transport/azure-transport/pom.xml
index d2d8d5f..6d05805 100644
--- a/transport/azure-transport/pom.xml
+++ b/transport/azure-transport/pom.xml
@@ -36,7 +36,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
- <version>12.0.0</version>
+ <version>12.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
new file mode 100644
index 0000000..03b745b
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -0,0 +1,96 @@
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+
+public class S3IncomingConnector implements IncomingChunkedConnector, IncomingStreamingConnector {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3IncomingConnector.class);
+
+ private GenericResource resource;
+ private AmazonS3 s3Client;
+
+ @Override
+ public void init(ConnectorConfig cc) throws Exception {
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+ .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setResourceId(cc.getResourceId()).build());
+ }
+
+ if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
+ logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+ throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+ }
+
+ S3Storage s3Storage = resource.getS3Storage();
+
+ S3Secret s3Secret;
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+ cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+ s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setSecretId(cc.getCredentialToken()).build());
+
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withRegion(s3Storage.getRegion())
+ .build();
+ }
+ }
+
+
+ @Override
+ public InputStream fetchInputStream() throws Exception {
+ S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), resource.getFile().getResourcePath());
+ return s3object.getObjectContent();
+ }
+
+ @Override
+ public InputStream fetchInputStream(String childPath) throws Exception {
+ S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), childPath);
+ return s3object.getObjectContent();
+ }
+
+ @Override
+ public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
+ GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
+ resource.getFile().getResourcePath());
+ rangeObjectRequest.setRange(startByte, endByte - 1);
+ ObjectMetadata objectMetadata = s3Client.getObject(rangeObjectRequest, new File(downloadFile));
+ logger.info("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resource.getResourceId());
+ }
+
+ @Override
+ public void complete() throws Exception {
+
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
new file mode 100644
index 0000000..3e95383
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -0,0 +1,101 @@
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.*;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class S3OutgoingConnector implements OutgoingChunkedConnector {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3OutgoingConnector.class);
+
+ private GenericResource resource;
+ private AmazonS3 s3Client;
+
+ InitiateMultipartUploadResult initResponse;
+ List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public void init(ConnectorConfig cc) throws Exception {
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+ .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setResourceId(cc.getResourceId()).build());
+ }
+
+ if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
+ logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+ throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+ }
+
+ S3Storage s3Storage = resource.getS3Storage();
+
+ S3Secret s3Secret;
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+ cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+ s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setSecretId(cc.getCredentialToken()).build());
+
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withRegion(s3Storage.getRegion())
+ .build();
+ }
+
+ InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(resource.getS3Storage().getBucketName(),
+ resource.getFile().getResourcePath());
+ initResponse = s3Client.initiateMultipartUpload(initRequest);
+ }
+
+ @Override
+ public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception {
+ File file = new File(uploadFile);
+ UploadPartRequest uploadRequest = new UploadPartRequest()
+ .withBucketName(resource.getS3Storage().getBucketName())
+ .withKey(resource.getFile().getResourcePath())
+ .withUploadId(initResponse.getUploadId())
+ .withPartNumber(chunkId + 1)
+ .withFileOffset(0)
+ .withFile(file)
+ .withPartSize(file.length());
+
+ UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+ this.partETags.add(uploadResult.getPartETag());
+ logger.info("Uploaded S3 chunk to path {} for resource id {}", uploadFile, resource.getResourceId());
+ }
+
+
+ @Override
+ public void complete() throws Exception {
+ CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(resource.getS3Storage().getBucketName(),
+ resource.getFile().getResourcePath(), initResponse.getUploadId(), partETags);
+ s3Client.completeMultipartUpload(compRequest);
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
deleted file mode 100644
index a430323..0000000
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.s3;
-
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
-import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-public class S3Receiver implements Connector {
-
- private static final Logger logger = LoggerFactory.getLogger(S3Receiver.class);
-
- private AmazonS3 s3Client;
-
- private String resourceServiceHost;
- private int resourceServicePort;
- private String secretServiceHost;
- private int secretServicePort;
-
- @Override
- public void init(String resourceServiceHost, int resourceServicePort,
- String secretServiceHost, int secretServicePort) throws Exception {
-
- this.resourceServiceHost = resourceServiceHost;
- this.resourceServicePort = resourceServicePort;
- this.secretServiceHost = secretServiceHost;
- this.secretServicePort = secretServicePort;
- }
-
- @Override
- public void destroy() {
-
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-
- logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
-
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setResourceId(resourceId).build());
-
- if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
- logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
- throw new Exception("Invalid storage type specified for resource " + resourceId);
- }
-
- S3Storage s3Storage = resource.getS3Storage();
-
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
-
- s3Client = AmazonS3ClientBuilder.standard()
- .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .withRegion(s3Storage.getRegion())
- .build();
-
- S3Object s3object = s3Client.getObject(s3Storage.getBucketName(), resource.getFile().getResourcePath());
- S3ObjectInputStream inputStream = s3object.getObjectContent();
-
- OutputStream os = context.getStreamBuffer().getOutputStream();
- int read;
- long bytes = 0;
- while ((read = inputStream.read()) != -1) {
- bytes++;
- os.write(read);
- }
- os.flush();
- os.close();
-
- logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
- ConnectorContext context) throws Exception {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
deleted file mode 100644
index 0dec96d..0000000
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.s3;
-
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
-import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class S3Sender implements Connector {
-
- private static final Logger logger = LoggerFactory.getLogger(S3Sender.class);
-
- private AmazonS3 s3Client;
-
- private String resourceServiceHost;
- private int resourceServicePort;
- private String secretServiceHost;
- private int secretServicePort;
-
- @Override
- public void init(String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
-
- this.resourceServiceHost = resourceServiceHost;
- this.resourceServicePort = resourceServicePort;
- this.secretServiceHost = secretServiceHost;
- this.secretServicePort = secretServicePort;
- }
-
- @Override
- public void destroy() {
-
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-
- logger.info("Starting S3 Sender stream for transfer {}", context.getTransferId());
- logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
-
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setResourceId(resourceId).build());
-
- if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
- logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
- throw new Exception("Invalid storage type specified for resource " + resourceId);
- }
-
- S3Storage s3Storage = resource.getS3Storage();
-
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
-
- s3Client = AmazonS3ClientBuilder.standard()
- .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .withRegion(s3Storage.getRegion())
- .build();
-
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setContentLength(context.getMetadata().getResourceSize());
-
- s3Client.putObject(s3Storage.getBucketName(), resource.getFile().getResourcePath(),
- context.getStreamBuffer().getInputStream(), metadata);
- logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
- ConnectorContext context) throws Exception {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java
new file mode 100644
index 0000000..806f7c2
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channel;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LimitInputStream extends FilterInputStream implements Channel {
+ private final AtomicBoolean open = new AtomicBoolean(true);
+ private long remaining;
+
+ public LimitInputStream(InputStream in, long length) {
+ super(in);
+ this.remaining = length;
+ }
+
+ public boolean isOpen() {
+ return this.open.get();
+ }
+
+ public int read() throws IOException {
+ if (!this.isOpen()) {
+ throw new IOException("read() - stream is closed (remaining=" + this.remaining + ")");
+ } else if (this.remaining > 0L) {
+ --this.remaining;
+ return super.read();
+ } else {
+ return -1;
+ }
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!this.isOpen()) {
+ throw new IOException("read(len=" + len + ") stream is closed (remaining=" + this.remaining + ")");
+ } else {
+ int nb = len;
+ if ((long)len > this.remaining) {
+ nb = (int)this.remaining;
+ }
+
+ if (nb > 0) {
+ int read = super.read(b, off, nb);
+ this.remaining -= (long)read;
+ return read;
+ } else {
+ return -1;
+ }
+ }
+ }
+
+ public long skip(long n) throws IOException {
+ if (!this.isOpen()) {
+ throw new IOException("skip(" + n + ") stream is closed (remaining=" + this.remaining + ")");
+ } else {
+ long skipped = super.skip(n);
+ this.remaining -= skipped;
+ return skipped;
+ }
+ }
+
+ public int available() throws IOException {
+ if (!this.isOpen()) {
+ throw new IOException("available() stream is closed (remaining=" + this.remaining + ")");
+ } else {
+ int av = super.available();
+ return (long)av > this.remaining ? (int)this.remaining : av;
+ }
+ }
+
+ public void close() throws IOException {
+ if (!this.open.getAndSet(false)) {
+ ;
+ }
+ }
+}
+
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
new file mode 100644
index 0000000..d6bacdb
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public final class SCPIncomingConnector implements IncomingStreamingConnector {
+
+ private static final Logger logger = LoggerFactory.getLogger(SCPIncomingConnector.class);
+
+ private Session session;
+ private GenericResource resource;
+ private Channel channel;
+ private OutputStream out;
+ private InputStream in;
+ private final byte[] buf = new byte[1024];
+
+ @Override
+ public void init(ConnectorConfig cc) throws Exception {
+
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+ .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setResourceId(cc.getResourceId()).build());
+ }
+
+ if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
+ logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+ throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+ }
+
+ SCPSecret scpSecret;
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+ cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+ scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setSecretId(cc.getCredentialToken()).build());
+ }
+
+ SCPStorage scpStorage = resource.getScpStorage();
+ logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
+
+ this.session = SCPTransportUtil.createSession(
+ scpSecret.getUser(),
+ scpStorage.getHost(),
+ scpStorage.getPort(),
+ scpSecret.getPrivateKey().getBytes(),
+ scpSecret.getPublicKey().getBytes(),
+ scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
+
+ if (session == null) {
+ logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ }
+ }
+
+ private String escapeSpecialChars(String path) {
+ return path.replace(" ", "\\ ");
+ }
+
+ @Override
+ public InputStream fetchInputStream() throws Exception {
+ String resourcePath = null;
+ switch (resource.getResourceCase()){
+ case FILE:
+ resourcePath = resource.getFile().getResourcePath();
+ break;
+ case DIRECTORY:
+ throw new Exception("A directory path can not be streamed");
+ case RESOURCE_NOT_SET:
+ throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+ }
+
+ return fetchInputStreamJCraft(escapeSpecialChars(resourcePath));
+ }
+
+ @Override
+ public InputStream fetchInputStream(String childPath) throws Exception {
+
+ String resourcePath = null;
+ switch (resource.getResourceCase()){
+ case FILE:
+ throw new Exception("A child path can not be associated with a file parent");
+ case DIRECTORY:
+ resourcePath = resource.getDirectory().getResourcePath();
+ if (!childPath.startsWith(resourcePath)) {
+ throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
+ }
+ resourcePath = childPath;
+ break;
+ case RESOURCE_NOT_SET:
+ throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+ }
+
+ return fetchInputStreamJCraft(escapeSpecialChars(resourcePath));
+ }
+
+ private InputStream fetchInputStreamJCraft(String resourcePath) throws Exception{
+ String command = "scp -f " + resourcePath;
+ channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ // get I/O streams for remote scp
+ out = channel.getOutputStream();
+ in = channel.getInputStream();
+
+ channel.connect();
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ while (true) {
+ int c = checkAck(in);
+ if (c != 'C') {
+ break;
+ }
+
+ // read '0644 '
+ in.read(buf, 0, 5);
+
+ long filesize = 0L;
+ while (true) {
+ if (in.read(buf, 0, 1) < 0) {
+ // error
+ break;
+ }
+ if (buf[0] == ' ') break;
+ filesize = filesize * 10L + (long) (buf[0] - '0');
+ }
+
+ String file = null;
+ for (int i = 0; ; i++) {
+ in.read(buf, i, 1);
+ if (buf[i] == (byte) 0x0a) {
+ file = new String(buf, 0, i);
+ break;
+ }
+ }
+
+ logger.info("file-size=" + filesize + ", file=" + file);
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ // read a content of lfile
+ return new LimitInputStream(in, filesize);
+ }
+ return null;
+ }
+
+ @Override
+ public void complete() throws Exception {
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ channel.disconnect();
+ session.disconnect();
+ }
+
+ private int checkAck(InputStream in) throws IOException {
+ int b = in.read();
+ // b may be 0 for success,
+ // 1 for error,
+ // 2 for fatal error,
+ // -1
+ if (b == 0) return b;
+ if (b == -1) return b;
+
+ if (b == 1 || b == 2) {
+ StringBuffer sb = new StringBuffer();
+ int c;
+ do {
+ c = in.read();
+ sb.append((char) c);
+ }
+ while (c != '\n');
+ if (b == 1) { // error
+ logger.error("Check Ack Failure b = 1 " + sb.toString());
+ }
+ if (b == 2) { // fatal error
+ logger.error("Check Ack Failure b = 2 " + sb.toString());
+ }
+ }
+ return b;
+ }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
index 1468479..882f8d2 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
@@ -120,27 +120,38 @@
public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
checkInitialized();
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource scpResource = resourceClient.get()
- .getGenericResource(GenericResourceGetRequest.newBuilder()
- .setAuthzToken(authZToken).setResourceId(resourceId).build());
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
- .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+ GenericResource scpResource;
+ SCPSecret scpSecret;
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+ scpResource = resourceClient.get()
+ .getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(authZToken).setResourceId(resourceId).build());
+ }
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+ scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+ .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+ }
return getFileResourceMetadata(authZToken,scpResource, scpSecret);
}
@Override
public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String childResourcePath, String credentialToken) throws Exception {
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setAuthzToken(authZToken)
- .setResourceId(parentResourceId).build());
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+ GenericResource resource;
+ SCPSecret scpSecret;
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(authZToken)
+ .setResourceId(parentResourceId).build());
+ }
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+ scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+ .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+ }
boolean isChildPath = false;
if (childResourcePath != null && !"".equals(childResourcePath)) {
@@ -148,9 +159,9 @@
}
String resourcePath = null;
- switch (resource.getResourceCase()){
+ switch (resource.getResourceCase()) {
case FILE:
- if (isChildPath){
+ if (isChildPath) {
throw new Exception("A child path can not be associated with a file parent");
}
resourcePath = resource.getFile().getResourcePath();
@@ -170,9 +181,9 @@
}
GenericResource scpResource2 = GenericResource.newBuilder()
- .setFile(FileResource.newBuilder()
- .setResourcePath(resourcePath).build())
- .setScpStorage(resource.getScpStorage()).build();
+ .setFile(FileResource.newBuilder()
+ .setResourcePath(resourcePath).build())
+ .setScpStorage(resource.getScpStorage()).build();
return getFileResourceMetadata(authZToken, scpResource2, scpSecret);
}
@@ -201,7 +212,7 @@
}
if (rri.isRegularFile()) {
- FileResourceMetadata.Builder childFileBuilder = FileResourceMetadata.Builder.getBuilder()
+ FileResourceMetadata.Builder childFileBuilder = FileResourceMetadata.Builder.newBuilder()
.withFriendlyName(rri.getName())
.withResourcePath(rri.getPath())
.withCreatedTime(rri.getAttributes().getAtime())
@@ -224,25 +235,36 @@
@Override
public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource scpPResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setAuthzToken(authZToken)
- .setResourceId(resourceId).build());
+ GenericResource resource;
+ SCPSecret scpSecret;
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(authZToken)
+ .setResourceId(resourceId).build());
+ }
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+ scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+ .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+ }
- return getDirectoryResourceMetadata(authZToken,scpPResource, scpSecret);
+ return getDirectoryResourceMetadata(authZToken, resource, scpSecret);
}
@Override
public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String childResourcePath, String credentialToken) throws Exception {
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setAuthzToken(authZToken).setResourceId(parentResourceId).build());
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+ GenericResource resource;
+ SCPSecret scpSecret;
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(authZToken).setResourceId(parentResourceId).build());
+ }
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+ scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+ .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+ }
boolean isChildPath = false;
if (childResourcePath != null && !"".equals(childResourcePath)) {
@@ -283,34 +305,44 @@
public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
checkInitialized();
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource scpResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setAuthzToken(authZToken).setResourceId(resourceId).build());
- return isAvailable(authZToken, scpResource, credentialToken);
+ GenericResource resource;
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(authZToken).setResourceId(resourceId).build());
+ }
+
+ return isAvailable(authZToken, resource, credentialToken);
}
@Override
public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
checkInitialized();
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource scpResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setAuthzToken(authToken).setResourceId(parentResourceId).build());
- validateParent(scpResource, resourcePath);
+ GenericResource resource;
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(authToken).setResourceId(parentResourceId).build());
+ }
+
+ validateParent(resource, resourcePath);
GenericResource targetScpResource = GenericResource.newBuilder()
.setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
- .setScpStorage(scpResource.getScpStorage()).build();
+ .setScpStorage(resource.getScpStorage()).build();
return isAvailable(authToken, targetScpResource, credentialToken);
}
public Boolean isAvailable(AuthToken authToken, GenericResource scpResource, String credentialToken) throws Exception {
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
- .setAuthzToken(authToken).setSecretId(credentialToken).build());
+
+ SCPSecret scpSecret;
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+ scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+ .setAuthzToken(authToken).setSecretId(credentialToken).build());
+ }
try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
logger.info("Checking the availability of file {}", scpResource.getFile().getResourcePath());
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
new file mode 100644
index 0000000..28ea5e0
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.OutgoingStreamingConnector;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public final class SCPOutgoingConnector implements OutgoingStreamingConnector {
+
+ private static final Logger logger = LoggerFactory.getLogger(SCPOutgoingConnector.class);
+
+ private GenericResource resource;
+ private Session session;
+ private OutputStream out;
+ private InputStream in;
+ private Channel channel;
+ private ConnectorConfig cc;
+ private final byte[] buf = new byte[1024];
+
+
+ @Override
+ public void init(ConnectorConfig cc) throws Exception {
+
+ this.cc = cc;
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+ .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setResourceId(cc.getResourceId()).build());
+ }
+
+ if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
+ logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+ throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+ }
+
+ SCPSecret scpSecret;
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+ cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+ scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setSecretId(cc.getCredentialToken()).build());
+ }
+
+ SCPStorage scpStorage = resource.getScpStorage();
+ logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
+
+ this.session = SCPTransportUtil.createSession(
+ scpSecret.getUser(),
+ scpStorage.getHost(),
+ scpStorage.getPort(),
+ scpSecret.getPrivateKey().getBytes(),
+ scpSecret.getPublicKey().getBytes(),
+ scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
+
+ if (session == null) {
+ logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ }
+ }
+
+ private String escapeSpecialChars(String path) {
+ return path.replace(" ", "\\ ");
+ }
+
+ @Override
+ public OutputStream fetchOutputStream() throws Exception {
+ String resourcePath = null;
+ switch (resource.getResourceCase()){
+ case FILE:
+ resourcePath = resource.getFile().getResourcePath();
+ break;
+ case DIRECTORY:
+ throw new Exception("A directory path can not be streamed");
+ case RESOURCE_NOT_SET:
+ throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+ }
+
+ return fetchOutputStreamJCraft(escapeSpecialChars(resourcePath), cc.getMetadata().getResourceSize());
+ }
+
+ @Override
+ public OutputStream fetchOutputStream(String childPath) throws Exception {
+ String resourcePath = null;
+ switch (resource.getResourceCase()){
+ case FILE:
+ throw new Exception("A child path can not be associated with a file parent");
+ case DIRECTORY:
+ resourcePath = resource.getDirectory().getResourcePath();
+ if (!childPath.startsWith(resourcePath)) {
+ throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
+ }
+ resourcePath = childPath;
+ break;
+ case RESOURCE_NOT_SET:
+ throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+ }
+
+ return fetchOutputStreamJCraft(escapeSpecialChars(resourcePath), cc.getMetadata().getResourceSize());
+ }
+
+ public OutputStream fetchOutputStreamJCraft(String resourcePath, long fileSize) throws Exception {
+ boolean ptimestamp = true;
+
+ // exec 'scp -t rfile' remotely
+ String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + resourcePath;
+ channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ // get I/O streams for remote scp
+ out = channel.getOutputStream();
+ in = channel.getInputStream();
+
+ channel.connect();
+
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+
+ if (ptimestamp) {
+ command = "T" + (System.currentTimeMillis() / 1000) + " 0";
+ // The access time should be sent here,
+ // but it is not accessible with JavaAPI ;-<
+ command += (" " + (System.currentTimeMillis() / 1000) + " 0\n");
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+ }
+
+ // send "C0644 filesize filename", where filename should not include '/'
+ command = "C0644 " + fileSize + " ";
+ if (resourcePath.lastIndexOf('/') > 0) {
+ command += resourcePath.substring(resourcePath.lastIndexOf('/') + 1);
+ } else {
+ command += resourcePath;
+ }
+
+ command += "\n";
+ out.write(command.getBytes());
+ out.flush();
+
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+
+ return out;
+ }
+
+ @Override
+ public void complete() throws Exception {
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ if (checkAck(in) != 0) {
+ throw new IOException("Error code found in ack " + (checkAck(in)));
+ }
+ out.close();
+ channel.disconnect();
+ session.disconnect();
+ }
+
+ public int checkAck(InputStream in) throws IOException {
+ int b = in.read();
+ // b may be 0 for success,
+ // 1 for error,
+ // 2 for fatal error,
+ // -1
+ if (b == 0) return b;
+ if (b == -1) return b;
+
+ if (b == 1 || b == 2) {
+ StringBuffer sb = new StringBuffer();
+ int c;
+ do {
+ c = in.read();
+ sb.append((char) c);
+ }
+ while (c != '\n');
+ if (b == 1) { // error
+ logger.error("Check Ack Failure b = 1 " + sb.toString());
+ }
+ if (b == 2) { // fatal error
+ logger.error("Check Ack Failure b = 2 " + sb.toString());
+ }
+ }
+ return b;
+ }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
deleted file mode 100644
index dab47d2..0000000
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.scp;
-
-import com.jcraft.jsch.ChannelExec;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.DoubleStreamingBuffer;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
-import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class SCPReceiver implements Connector {
-
- private static final Logger logger = LoggerFactory.getLogger(SCPReceiver.class);
-
- boolean initialized = false;
-
- private Session session;
- private String resourceServiceHost;
- private int resourceServicePort;
- private String secretServiceHost;
- private int secretServicePort;
-
- public void init(String resourceServiceHost, int resourceServicePort,
- String secretServiceHost, int secretServicePort) throws Exception {
-
- this.resourceServiceHost = resourceServiceHost;
- this.resourceServicePort = resourceServicePort;
- this.secretServiceHost = secretServiceHost;
- this.secretServicePort = secretServicePort;
-
- if (initialized) {
- destroy();
- }
-
- this.initialized = true;
- }
-
- public void destroy() {
- try {
- this.session.disconnect();
- } catch (Exception e) {
- logger.error("Errored while disconnecting session", e);
- }
- }
-
- private void checkInitialized() {
- if (!initialized) {
- throw new IllegalStateException("SCP Receiver is not initialized");
- }
- }
-
- public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
- startStream(authToken, resourceId, null, credentialToken, context);
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
- ConnectorContext context) throws Exception {
- checkInitialized();
-
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setAuthzToken(authToken).setResourceId(resourceId).build());
-
- if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
- logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
- throw new Exception("Invalid storage type specified for resource " + resourceId);
- }
-
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
- .setAuthzToken(authToken)
- .setSecretId(credentialToken).build());
-
- SCPStorage scpStorage = resource.getScpStorage();
- logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
-
- this.session = SCPTransportUtil.createSession(
- scpSecret.getUser(),
- scpStorage.getHost(),
- scpStorage.getPort(),
- scpSecret.getPrivateKey().getBytes(),
- scpSecret.getPublicKey().getBytes(),
- scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
-
- if (session == null) {
- logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
- throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
- }
-
- boolean isChildPath = false;
- if (childResourcePath != null && !"".equals(childResourcePath)) {
- isChildPath = true;
- }
-
- String resourcePath = null;
- switch (resource.getResourceCase()){
- case FILE:
- if (isChildPath){
- throw new Exception("A child path can not be associated with a file parent");
- }
- resourcePath = resource.getFile().getResourcePath();
- break;
- case DIRECTORY:
- resourcePath = resource.getDirectory().getResourcePath();
- if (isChildPath) {
- if (!childResourcePath.startsWith(resourcePath)) {
- throw new Exception("Child path " + childResourcePath + " is not in the parent path " + resourcePath);
- }
- resourcePath = childResourcePath;
- }
-
- break;
- case RESOURCE_NOT_SET:
- throw new Exception("Resource was not set in resource with id " + resourceId);
- }
-
- transferRemoteToStream(session, resourcePath, context.getStreamBuffer());
- logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
- }
-
- private void transferRemoteToStream(Session session, String from, DoubleStreamingBuffer streamBuffer) throws Exception {
-
- try {
- OutputStream outputStream = streamBuffer.getOutputStream();
-
- logger.info("Starting scp receive");
- // exec 'scp -f rfile' remotely
- String command = "scp -f " + from;
- com.jcraft.jsch.Channel channel = session.openChannel("exec");
- ((ChannelExec) channel).setCommand(command);
-
- // get I/O streams for remote scp
- OutputStream out = channel.getOutputStream();
- InputStream in = channel.getInputStream();
-
- channel.connect();
-
- byte[] buf = new byte[1024];
-
- // send '\0'
- buf[0] = 0;
- out.write(buf, 0, 1);
- out.flush();
-
- while (true) {
- int c = checkAck(in);
- if (c != 'C') {
- break;
- }
-
- // read '0644 '
- in.read(buf, 0, 5);
-
- long filesize = 0L;
- while (true) {
- if (in.read(buf, 0, 1) < 0) {
- // error
- break;
- }
- if (buf[0] == ' ') break;
- filesize = filesize * 10L + (long) (buf[0] - '0');
- }
-
- String file = null;
- for (int i = 0; ; i++) {
- in.read(buf, i, 1);
- if (buf[i] == (byte) 0x0a) {
- file = new String(buf, 0, i);
- break;
- }
- }
-
- logger.info("file-size=" + filesize + ", file=" + file);
- // send '\0'
- buf[0] = 0;
- out.write(buf, 0, 1);
- out.flush();
-
- // read a content of lfile
- int bufSize;
- while (true) {
- if (buf.length < filesize) bufSize = buf.length;
- else bufSize = (int) filesize;
- bufSize = in.read(buf, 0, bufSize);
- if (bufSize < 0) {
- // error
- break;
- }
- //System.out.println("Read " + bufSize);
- outputStream.write(buf, 0, bufSize);
- outputStream.flush();
-
- filesize -= bufSize;
- if (filesize == 0L) break;
- }
-
- if (checkAck(in) != 0) {
- throw new IOException("Error code found in ack " + (checkAck(in)));
- }
-
- // send '\0'
- buf[0] = 0;
- out.write(buf, 0, 1);
- out.flush();
- outputStream.close();
- }
-
- channel.disconnect();
- session.disconnect();
-
- } finally {
- try {
- session.disconnect();
- } catch (Exception e) {
- logger.warn("Session disconnection failed", e);
- }
- }
-
- }
-
- public int checkAck(InputStream in) throws IOException {
- int b = in.read();
- // b may be 0 for success,
- // 1 for error,
- // 2 for fatal error,
- // -1
- if (b == 0) return b;
- if (b == -1) return b;
-
- if (b == 1 || b == 2) {
- StringBuffer sb = new StringBuffer();
- int c;
- do {
- c = in.read();
- sb.append((char) c);
- }
- while (c != '\n');
- if (b == 1) { // error
- System.out.print(sb.toString());
- }
- if (b == 2) { // fatal error
- System.out.print(sb.toString());
- }
- }
- return b;
- }
-}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
deleted file mode 100644
index ce33646..0000000
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.scp;
-
-import com.jcraft.jsch.ChannelExec;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.DoubleStreamingBuffer;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
-import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-public class SCPSender implements Connector {
-
- private static final Logger logger = LoggerFactory.getLogger(SCPSender.class);
-
- boolean initialized = false;
-
- private Session session;
- private String resourceServiceHost;
- private int resourceServicePort;
- private String secretServiceHost;
- private int secretServicePort;
-
- public void init(String resourceServiceHost, int resourceServicePort,
- String secretServiceHost, int secretServicePort) throws Exception {
-
- this.resourceServiceHost = resourceServiceHost;
- this.resourceServicePort = resourceServicePort;
- this.secretServiceHost = secretServiceHost;
- this.secretServicePort = secretServicePort;
-
- if (initialized) {
- destroy();
- }
- this.initialized = true;
- }
-
-
- public void destroy() {
-
- try {
- this.session.disconnect();
- } catch (Exception e) {
- logger.error("Errored while disconnecting session", e);
- }
- }
-
- private void checkInitialized() {
- if (!initialized) {
- throw new IllegalStateException("SCP Sender is not initialized");
- }
- }
-
- public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
- startStream(authToken, resourceId, null, credentialToken, context);
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken, ConnectorContext context) throws Exception {
- checkInitialized();
-
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setAuthzToken(authToken).setResourceId(resourceId).build());
-
- if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
- logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
- throw new Exception("Invalid storage type specified for resource " + resourceId);
- }
-
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
- .setAuthzToken(authToken)
- .setSecretId(credentialToken).build());
-
- SCPStorage scpStorage = resource.getScpStorage();
- logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
-
- this.session = SCPTransportUtil.createSession(
- scpSecret.getUser(),
- scpStorage.getHost(),
- scpStorage.getPort(),
- scpSecret.getPrivateKey().getBytes(),
- scpSecret.getPublicKey().getBytes(),
- scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
-
- if (session == null) {
- System.out.println("Session can not be null. Make sure that SCP Sender is properly initialized");
- throw new Exception("Session can not be null. Make sure that SCP Sender is properly initialized");
- }
-
- boolean isChildPath = false;
- if (childResourcePath != null && !"".equals(childResourcePath)) {
- isChildPath = true;
- }
-
- String resourcePath = null;
- switch (resource.getResourceCase()){
- case FILE:
- if (isChildPath){
- throw new Exception("A child path can not be associated with a file parent");
- }
- resourcePath = resource.getFile().getResourcePath();
- break;
- case DIRECTORY:
- resourcePath = resource.getDirectory().getResourcePath();
- if (isChildPath) {
- if (!childResourcePath.startsWith(resourcePath)) {
- throw new Exception("Child path " + childResourcePath + " is not in the parent path " + resourcePath);
- }
- resourcePath = childResourcePath;
- }
-
- break;
- case RESOURCE_NOT_SET:
- throw new Exception("Resource was not set in resource with id " + resourceId);
- }
-
- try {
- copyLocalToRemote(this.session,
- resourcePath,
- context.getStreamBuffer(),
- context.getMetadata().getResourceSize());
- logger.info("SCP send to transfer {} completed", context.getTransferId());
-
- } catch (Exception e) {
- logger.error("Errored while streaming to remote scp server. Transfer {}", context.getTransferId() , e);
- throw e;
- }
- }
-
- private void copyLocalToRemote(Session session, String to, DoubleStreamingBuffer streamBuffer, long fileSize) throws JSchException, IOException {
- try {
- logger.info("Starting scp send for remote server");
- InputStream inputStream = streamBuffer.getInputStream();
-
- boolean ptimestamp = true;
-
- // exec 'scp -t rfile' remotely
- String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + to;
- com.jcraft.jsch.Channel channel = session.openChannel("exec");
- ((ChannelExec) channel).setCommand(command);
-
- // get I/O streams for remote scp
- OutputStream out = channel.getOutputStream();
- InputStream in = channel.getInputStream();
-
- channel.connect();
-
- if (checkAck(in) != 0) {
- throw new IOException("Error code found in ack " + (checkAck(in)));
- }
-
- if (ptimestamp) {
- command = "T" + (System.currentTimeMillis() / 1000) + " 0";
- // The access time should be sent here,
- // but it is not accessible with JavaAPI ;-<
- command += (" " + (System.currentTimeMillis() / 1000) + " 0\n");
- out.write(command.getBytes());
- out.flush();
- if (checkAck(in) != 0) {
- throw new IOException("Error code found in ack " + (checkAck(in)));
- }
- }
-
- // send "C0644 filesize filename", where filename should not include '/'
- command = "C0644 " + fileSize + " ";
- if (to.lastIndexOf('/') > 0) {
- command += to.substring(to.lastIndexOf('/') + 1);
- } else {
- command += to;
- }
-
- command += "\n";
- out.write(command.getBytes());
- out.flush();
-
- if (checkAck(in) != 0) {
- throw new IOException("Error code found in ack " + (checkAck(in)));
- }
-
- // send a content of lfile
- byte[] buf = new byte[1024];
- long totalWritten = 0;
- while (true) {
- int len = inputStream.read(buf, 0, buf.length);
- if (len == -1) {
- break;
- } else {
- out.write(buf, 0, len); //out.flush();
- totalWritten += len;
- //System.out.println("Write " + totalWritten);
- if (totalWritten == fileSize) {
- break;
- }
- }
- }
-
- // send '\0'
- buf[0] = 0;
- out.write(buf, 0, 1);
- out.flush();
-
- if (checkAck(in) != 0) {
- throw new IOException("Error code found in ack " + (checkAck(in)));
- }
- out.close();
- channel.disconnect();
-
- } finally {
- try {
- session.disconnect();
- } catch (Exception e) {
- logger.warn("Session disconnection failed", e);
- }
- }
- }
-
- public int checkAck(InputStream in) throws IOException {
- int b = in.read();
- // b may be 0 for success,
- // 1 for error,
- // 2 for fatal error,
- // -1
- if (b == 0) return b;
- if (b == -1) return b;
-
- if (b == 1 || b == 2) {
- StringBuffer sb = new StringBuffer();
- int c;
- do {
- c = in.read();
- sb.append((char) c);
- }
- while (c != '\n');
- if (b == 1) { // error
- System.out.print(sb.toString());
- }
- if (b == 2) { // fatal error
- System.out.print(sb.toString());
- }
- }
- return b;
- }
-}