Making storage as first class entity and supporting http transport in the agent
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
index 175e867..b74b90a 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
@@ -22,12 +22,14 @@
public class TransferCommand {
private String transferId;
- private String sourceId;
+ private String sourceStorageId;
+ private String sourcePath;
private String sourceType;
private String sourceToken;
private String sourceResourceBackend;
private String sourceCredentialBackend;
- private String destinationId;
+ private String destinationStorageId;
+ private String destinationPath;
private String destinationType;
private String destinationToken;
private String destResourceBackend;
@@ -42,12 +44,21 @@
return this;
}
- public String getSourceId() {
- return sourceId;
+ public String getSourceStorageId() {
+ return sourceStorageId;
}
- public TransferCommand setSourceId(String sourceId) {
- this.sourceId = sourceId;
+ public TransferCommand setSourceStorageId(String sourceStorageId) {
+ this.sourceStorageId = sourceStorageId;
+ return this;
+ }
+
+ public String getSourcePath() {
+ return sourcePath;
+ }
+
+ public TransferCommand setSourcePath(String sourcePath) {
+ this.sourcePath = sourcePath;
return this;
}
@@ -87,12 +98,21 @@
return this;
}
- public String getDestinationId() {
- return destinationId;
+ public String getDestinationStorageId() {
+ return destinationStorageId;
}
- public TransferCommand setDestinationId(String destinationId) {
- this.destinationId = destinationId;
+ public TransferCommand setDestinationStorageId(String destinationStorageId) {
+ this.destinationStorageId = destinationStorageId;
+ return this;
+ }
+
+ public String getDestinationPath() {
+ return destinationPath;
+ }
+
+ public TransferCommand setDestinationPath(String destinationPath) {
+ this.destinationPath = destinationPath;
return this;
}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
index ecd5cdc..8512713 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
@@ -21,12 +21,14 @@
public class TransferRequest {
- private String sourceId;
+ private String sourceStorageId;
+ private String sourcePath;
private String sourceType;
private String sourceToken;
private String sourceResourceBackend;
private String sourceCredentialBackend;
- private String destinationId;
+ private String destinationStorageId;
+ private String destinationPath;
private String destinationType;
private String destinationToken;
private String destResourceBackend;
@@ -34,12 +36,21 @@
private boolean affinityTransfer;
private Map<String, Integer> targetAgents;
- public String getSourceId() {
- return sourceId;
+ public String getSourceStorageId() {
+ return sourceStorageId;
}
- public TransferRequest setSourceId(String sourceId) {
- this.sourceId = sourceId;
+ public TransferRequest setSourceStorageId(String sourceStorageId) {
+ this.sourceStorageId = sourceStorageId;
+ return this;
+ }
+
+ public String getSourcePath() {
+ return sourcePath;
+ }
+
+ public TransferRequest setSourcePath(String sourcePath) {
+ this.sourcePath = sourcePath;
return this;
}
@@ -79,12 +90,21 @@
return this;
}
- public String getDestinationId() {
- return destinationId;
+ public String getDestinationStorageId() {
+ return destinationStorageId;
}
- public TransferRequest setDestinationId(String destinationId) {
- this.destinationId = destinationId;
+ public TransferRequest setDestinationStorageId(String destinationStorageId) {
+ this.destinationStorageId = destinationStorageId;
+ return this;
+ }
+
+ public String getDestinationPath() {
+ return destinationPath;
+ }
+
+ public TransferRequest setDestinationPath(String destinationPath) {
+ this.destinationPath = destinationPath;
return this;
}
diff --git a/agent/pom.xml b/agent/pom.xml
index 7ca3753..7d6765e 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -99,6 +99,11 @@
<artifactId>log4j-over-slf4j</artifactId>
<version>${log4j.over.slf4j}</version>
</dependency>
+ <dependency>
+ <groupId>com.sun.activation</groupId>
+ <artifactId>javax.activation</artifactId>
+ <version>1.2.0</version>
+ </dependency>
</dependencies>
<build>
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
index 4409232..49b3ddf 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
@@ -18,10 +18,10 @@
package org.apache.airavata.mft.agent;
import org.apache.airavata.mft.admin.MFTConsulClient;
+import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.PropertySource;
@Configuration
public class AppConfig {
@@ -41,4 +41,9 @@
public RPCParser rpcParser() {
return new RPCParser();
}
+
+ @Bean
+ public HttpTransferRequestsStore transferRequestStore() {
+ return new HttpTransferRequestsStore();
+ }
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index c7bd8f7..19a0dfb 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -30,6 +30,8 @@
import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.agent.http.HttpServer;
+import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
@@ -101,6 +103,9 @@
@Autowired
private MFTConsulClient mftConsulClient;
+ @Autowired
+ private HttpTransferRequestsStore transferRequestsStore;
+
public void init() {
transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId );
rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId );
@@ -147,11 +152,11 @@
Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
Connector inConnector = inConnectorOpt.orElseThrow(() -> new Exception("Could not find an in connector for given input"));
- inConnector.init(request.getSourceId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+ inConnector.init(request.getSourceStorageId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
Optional<Connector> outConnectorOpt = ConnectorResolver.resolveConnector(request.getDestinationType(), "OUT");
Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
- outConnector.init(request.getDestinationId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+ outConnector.init(request.getDestinationStorageId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
@@ -221,6 +226,19 @@
transferMessageCache.start();
}
+ private void acceptHTTPRequests() {
+ logger.info("Starting the HTTP front end");
+
+ new Thread(() -> {
+ HttpServer httpServer = new HttpServer(3333, false, transferRequestsStore);
+ try {
+ httpServer.run();
+ } catch (Exception e) {
+ logger.error("Http frontend server start failed", e);
+ }
+ }).start();
+ }
+
private boolean connectAgent() throws MFTConsulClientException {
final ImmutableSession session = ImmutableSession.builder()
.name(agentId)
@@ -316,6 +334,7 @@
acceptTransferRequests();
acceptRPCRequests();
+ acceptHTTPRequests();
}
@Override
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index cd90503..c1ac253 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -52,7 +52,10 @@
MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
BiConsumer<String, Boolean> exitingCallback) throws Exception {
- FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(command.getSourceId(), command.getSourceToken());
+ FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
+ command.getSourceStorageId(),
+ command.getSourcePath(),
+ command.getSourceToken());
final long resourceSize = srcMetadata.getResourceSize();
logger.debug("Source file size {}. MD5 {}", resourceSize, srcMetadata.getMd5sum());
@@ -65,8 +68,8 @@
context.setStreamBuffer(streamBuffer);
context.setTransferId(command.getTransferId());
- TransferTask recvTask = new TransferTask(inConnector, context);
- TransferTask sendTask = new TransferTask(outConnector, context);
+ TransferTask recvTask = new TransferTask(inConnector, context, command.getSourcePath());
+ TransferTask sendTask = new TransferTask(outConnector, context, command.getDestinationPath());
List<Future<Integer>> futureList = new ArrayList<>();
ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
@@ -121,14 +124,19 @@
}
if (!transferErrored) {
- Boolean transferred = destMetadataCollector.isAvailable(command.getDestinationId(), command.getDestinationToken());
+ Boolean transferred = destMetadataCollector.isAvailable(
+ command.getDestinationStorageId(),
+ command.getDestinationPath(),
+ command.getDestinationToken());
if (!transferred) {
logger.error("Transfer completed but resource is not available in destination");
throw new Exception("Transfer completed but resource is not available in destination");
}
- FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(command.getDestinationId(),
+ FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(
+ command.getDestinationStorageId(),
+ command.getDestinationPath(),
command.getDestinationToken());
boolean doIntegrityVerify = true;
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
new file mode 100644
index 0000000..bb61ebd
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+public class ConnectorParams {
+
+ private String storageId, credentialToken, resourceServiceHost, secretServiceHost;
+ private int resourceServicePort, secretServicePort;
+
+ public String getStorageId() {
+ return storageId;
+ }
+
+ public ConnectorParams setStorageId(String storageId) {
+ this.storageId = storageId;
+ return this;
+ }
+
+ public String getCredentialToken() {
+ return credentialToken;
+ }
+
+ public ConnectorParams setCredentialToken(String credentialToken) {
+ this.credentialToken = credentialToken;
+ return this;
+ }
+
+ public String getResourceServiceHost() {
+ return resourceServiceHost;
+ }
+
+ public ConnectorParams setResourceServiceHost(String resourceServiceHost) {
+ this.resourceServiceHost = resourceServiceHost;
+ return this;
+ }
+
+ public String getSecretServiceHost() {
+ return secretServiceHost;
+ }
+
+ public ConnectorParams setSecretServiceHost(String secretServiceHost) {
+ this.secretServiceHost = secretServiceHost;
+ return this;
+ }
+
+ public int getResourceServicePort() {
+ return resourceServicePort;
+ }
+
+ public ConnectorParams setResourceServicePort(int resourceServicePort) {
+ this.resourceServicePort = resourceServicePort;
+ return this;
+ }
+
+ public int getSecretServicePort() {
+ return secretServicePort;
+ }
+
+ public ConnectorParams setSecretServicePort(int secretServicePort) {
+ this.secretServicePort = secretServicePort;
+ return this;
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
new file mode 100644
index 0000000..2cf56af
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+
+public class HttpDownloadRequest {
+
+ private ConnectorParams connectorParams;
+ private Connector srcConnector;
+ private MetadataCollector srcMetadataCollector;
+ private String srcResourceId;
+ private String srcToken;
+
+ public ConnectorParams getConnectorParams() {
+ return connectorParams;
+ }
+
+ public HttpDownloadRequest setConnectorParams(ConnectorParams connectorParams) {
+ this.connectorParams = connectorParams;
+ return this;
+ }
+
+ public Connector getSrcConnector() {
+ return srcConnector;
+ }
+
+ public HttpDownloadRequest setSrcConnector(Connector srcConnector) {
+ this.srcConnector = srcConnector;
+ return this;
+ }
+
+ public MetadataCollector getSrcMetadataCollector() {
+ return srcMetadataCollector;
+ }
+
+ public HttpDownloadRequest setSrcMetadataCollector(MetadataCollector srcMetadataCollector) {
+ this.srcMetadataCollector = srcMetadataCollector;
+ return this;
+ }
+
+ public String getSrcResourceId() {
+ return srcResourceId;
+ }
+
+ public HttpDownloadRequest setSrcResourceId(String srcResourceId) {
+ this.srcResourceId = srcResourceId;
+ return this;
+ }
+
+ public String getSrcToken() {
+ return srcToken;
+ }
+
+ public HttpDownloadRequest setSrcToken(String srcToken) {
+ this.srcToken = srcToken;
+ return this;
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServer.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServer.java
new file mode 100644
index 0000000..4cc0d8a
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+
+public class HttpServer {
+
+ private final int port;
+ private final boolean enableSSL;
+ private HttpTransferRequestsStore transferRequestsStore;
+
+ public HttpServer(int port, boolean enableSSL, HttpTransferRequestsStore transferRequestsStore) {
+ this.port = port;
+ this.enableSSL = enableSSL;
+ this.transferRequestsStore = transferRequestsStore;
+ }
+
+ public void run() throws Exception {
+ // Configure SSL.
+ final SslContext sslCtx;
+ if (enableSSL) {
+ SelfSignedCertificate ssc = new SelfSignedCertificate();
+ sslCtx = SslContext.newServerContext(SslProvider.JDK, ssc.certificate(), ssc.privateKey());
+ } else {
+ sslCtx = null;
+ }
+
+ EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new HttpServerInitializer(sslCtx, transferRequestsStore));
+
+ Channel ch = b.bind(port).sync().channel();
+
+ System.err.println("Open your web browser and navigate to " +
+ (enableSSL? "https" : "http") + "://127.0.0.1:" + port + '/');
+
+ ch.closeFuture().sync();
+ } finally {
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
new file mode 100644
index 0000000..57ee4f0
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelProgressiveFuture;
+import io.netty.channel.ChannelProgressiveFutureListener;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.util.CharsetUtil;
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.DoubleStreamingBuffer;
+import org.apache.airavata.mft.core.FileResourceMetadata;
+import org.apache.airavata.mft.core.TransferTask;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.activation.MimetypesFileTypeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static io.netty.handler.codec.http.HttpMethod.*;
+import static io.netty.handler.codec.http.HttpResponseStatus.*;
+import static io.netty.handler.codec.http.HttpVersion.*;
+
+public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
+
+ private final HttpTransferRequestsStore transferRequestsStore;
+ private final ExecutorService executor = Executors.newFixedThreadPool(10);
+
+ public HttpServerHandler(HttpTransferRequestsStore transferRequestsStore) {
+ this.transferRequestsStore = transferRequestsStore;
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+ if (!request.decoderResult().isSuccess()) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
+
+ if (request.method() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+
+ final String uri = request.uri().substring(1);
+ logger.info("Received download request through url {}", uri);
+
+ HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
+
+ if (httpTransferRequest == null) {
+ logger.error("Couldn't find transfer request for uri {}", uri);
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+
+ Connector connector = httpTransferRequest.getOtherConnector();
+ MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
+
+ ConnectorParams params = httpTransferRequest.getConnectorParams();
+
+ connector.init(params.getStorageId(), params.getCredentialToken(), params.getResourceServiceHost(),
+ params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
+
+ metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
+ params.getSecretServiceHost(), params.getSecretServicePort());
+
+ Boolean available = metadataCollector.isAvailable(params.getStorageId(),
+ httpTransferRequest.getTargetResourcePath(),
+ params.getCredentialToken());
+
+
+ if (!available) {
+ logger.error("File {} is not available in store {}", httpTransferRequest.getTargetResourcePath(), params.getStorageId());
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+
+ FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(params.getStorageId(),
+ httpTransferRequest.getTargetResourcePath(),
+ params.getCredentialToken());
+
+ long fileLength = fileResourceMetadata.getResourceSize();
+
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ HttpUtil.setContentLength(response, fileLength);
+ setContentTypeHeader(response, httpTransferRequest.getTargetResourcePath());
+
+ if (HttpUtil.isKeepAlive(request)) {
+ response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
+ }
+
+ // Write the initial line and the header.
+ ctx.write(response);
+
+ // Write the content.
+ ChannelFuture sendFileFuture;
+ ChannelFuture lastContentFuture;
+
+ ConnectorContext connectorContext = new ConnectorContext();
+ connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
+ connectorContext.setTransferId(uri);
+ connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
+
+ TransferTask pullTask = new TransferTask(connector, connectorContext, httpTransferRequest.getTargetResourcePath());
+
+ // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
+ Future<Integer> pullStatusFuture = executor.submit(pullTask);
+
+ sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
+ ctx.newProgressivePromise());
+
+ // HttpChunkedInput will write the end marker (LastHttpContent) for us.
+ lastContentFuture = sendFileFuture;
+
+ sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
+ @Override
+ public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
+ if (total < 0) { // total unknown
+ System.err.println(future.channel() + " Transfer progress: " + progress);
+ } else {
+ System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
+ }
+ }
+
+ @Override
+ public void operationComplete(ChannelProgressiveFuture future) {
+ System.err.println(future.channel() + " Transfer complete.");
+ }
+ });
+
+ // Decide whether to close the connection or not.
+ if (!HttpUtil.isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ cause.printStackTrace();
+ if (ctx.channel().isActive()) {
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
+ response.headers().set(HttpHeaderNames.LOCATION, newUri);
+
+ // Close the connection as soon as the error message is sent.
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+ FullHttpResponse response = new DefaultFullHttpResponse(
+ HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
+
+ // Close the connection as soon as the error message is sent.
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private static void setContentTypeHeader(HttpResponse response, String path) {
+ MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, path);
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerInitializer.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerInitializer.java
new file mode 100644
index 0000000..f94d067
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerInitializer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
+
+ private final SslContext sslCtx;
+ private final HttpTransferRequestsStore transferRequestsStore;
+
+ public HttpServerInitializer(SslContext sslCtx, HttpTransferRequestsStore transferRequestsStore) {
+ this.sslCtx = sslCtx;
+ this.transferRequestsStore = transferRequestsStore;
+ }
+
+ @Override
+ public void initChannel(SocketChannel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (sslCtx != null) {
+ pipeline.addLast(sslCtx.newHandler(ch.alloc()));
+ }
+ pipeline.addLast(new HttpServerCodec());
+ pipeline.addLast(new HttpObjectAggregator(65536));
+ pipeline.addLast(new ChunkedWriteHandler());
+ pipeline.addLast(new HttpServerHandler(transferRequestsStore));
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
new file mode 100644
index 0000000..dbc807f
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+
+public class HttpTransferRequest {
+ private Connector otherConnector;
+ private MetadataCollector otherMetadataCollector;
+ private ConnectorParams connectorParams;
+ private String targetResourcePath;
+
+ public Connector getOtherConnector() {
+ return otherConnector;
+ }
+
+ public HttpTransferRequest setOtherConnector(Connector otherConnector) {
+ this.otherConnector = otherConnector;
+ return this;
+ }
+
+ public MetadataCollector getOtherMetadataCollector() {
+ return otherMetadataCollector;
+ }
+
+ public HttpTransferRequest setOtherMetadataCollector(MetadataCollector otherMetadataCollector) {
+ this.otherMetadataCollector = otherMetadataCollector;
+ return this;
+ }
+
+ public String getTargetResourcePath() {
+ return targetResourcePath;
+ }
+
+ public HttpTransferRequest setTargetResourcePath(String targetResourcePath) {
+ this.targetResourcePath = targetResourcePath;
+ return this;
+ }
+
+ public ConnectorParams getConnectorParams() {
+ return connectorParams;
+ }
+
+ public HttpTransferRequest setConnectorParams(ConnectorParams connectorParams) {
+ this.connectorParams = connectorParams;
+ return this;
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
new file mode 100644
index 0000000..fc38701
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class HttpTransferRequestsStore {
+
+ final private Map<String, HttpTransferRequest> downloadRequestStore = new HashMap<>();
+ final private Map<String, HttpTransferRequest> uploadRequestStore = new HashMap<>();
+
+ public String addDownloadRequest(HttpTransferRequest request) {
+ String randomUrl = UUID.randomUUID().toString();
+ downloadRequestStore.put(randomUrl, request);
+ return randomUrl;
+ }
+
+ public HttpTransferRequest getDownloadRequest(String url) {
+
+ //TODO Need to block concurrent calls to same url as connectors are not thread safe
+ HttpTransferRequest request = downloadRequestStore.get(url);
+ return request;
+ }
+
+ public String addUploadRequest(HttpTransferRequest request) {
+ String randomUrl = UUID.randomUUID().toString();
+ uploadRequestStore.put(randomUrl, request);
+ return randomUrl;
+ }
+
+ public HttpTransferRequest getUploadRequest(String url) {
+
+ //TODO Need to block concurrent calls to same url as connectors are not thread safe
+ HttpTransferRequest request = downloadRequestStore.get(url);
+ return request;
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
index f136704..dbcc48a 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
@@ -20,12 +20,18 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
+import org.apache.airavata.mft.agent.http.ConnectorParams;
+import org.apache.airavata.mft.agent.http.HttpTransferRequest;
+import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
+import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.DirectoryResourceMetadata;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
+import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import java.util.Optional;
@@ -45,6 +51,9 @@
@org.springframework.beans.factory.annotation.Value("${secret.service.port}")
private int secretServicePort;
+ @Autowired
+ private HttpTransferRequestsStore httpTransferRequestsStore;
+
public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
// TODO implement using the reflection
ObjectMapper mapper = new ObjectMapper();
@@ -64,6 +73,7 @@
return mapper.writeValueAsString(fileResourceMetadata);
}
break;
+
case "getChildFileResourceMetadata":
resourceId = request.getParameters().get("resourceId");
resourceType = request.getParameters().get("resourceType");
@@ -79,6 +89,7 @@
return mapper.writeValueAsString(fileResourceMetadata);
}
break;
+
case "getDirectoryResourceMetadata":
resourceId = request.getParameters().get("resourceId");
resourceType = request.getParameters().get("resourceType");
@@ -93,6 +104,7 @@
return mapper.writeValueAsString(dirResourceMetadata);
}
break;
+
case "getChildDirectoryResourceMetadata":
resourceId = request.getParameters().get("resourceId");
resourceType = request.getParameters().get("resourceType");
@@ -108,6 +120,32 @@
return mapper.writeValueAsString(dirResourceMetadata);
}
break;
+
+ case "submitHttpDownload":
+ String storeId = request.getParameters().get("storeId");
+ String sourcePath = request.getParameters().get("sourcePath");
+ String sourceToken = request.getParameters().get("sourceToken");
+ String storeType = request.getParameters().get("storeType");
+ mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
+
+ metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
+ Optional<Connector> connectorOp = ConnectorResolver.resolveConnector(storeType, "IN");
+
+ if (metadataCollectorOp.isPresent() && connectorOp.isPresent()) {
+ HttpTransferRequest transferRequest = new HttpTransferRequest();
+ transferRequest.setConnectorParams(new ConnectorParams()
+ .setResourceServiceHost(resourceServiceHost)
+ .setResourceServicePort(resourceServicePort)
+ .setSecretServiceHost(secretServiceHost)
+ .setSecretServicePort(secretServicePort)
+ .setStorageId(storeId).setCredentialToken(sourceToken));
+ transferRequest.setTargetResourcePath(sourcePath);
+ transferRequest.setOtherMetadataCollector(metadataCollectorOp.get());
+ transferRequest.setOtherConnector(connectorOp.get());
+ String url = httpTransferRequestsStore.addDownloadRequest(transferRequest);
+ return url;
+ }
+ break;
}
logger.error("Unknown method type specified {}", request.getMethod());
throw new Exception("Unknown method " + request.getMethod());
diff --git a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index 108bd30..1a6a8b9 100644
--- a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -92,6 +92,51 @@
}
@Override
+ public void submitHttpUpload(HttpUploadApiRequest request, StreamObserver<HttpUploadApiResponse> responseObserver) {
+ super.submitHttpUpload(request, responseObserver);
+ }
+
+ @Override
+ public void submitHttpDownload(HttpDownloadApiRequest request, StreamObserver<HttpDownloadApiResponse> responseObserver) {
+ try {
+
+ // TODO : Automatically derive agent if the target agent is empty
+ SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
+ .withAgentId(request.getTargetAgent())
+ .withMessageId(UUID.randomUUID().toString())
+ .withMethod("submitHttpDownload")
+ .withParameter("storeId", request.getSourceStoreId())
+ .withParameter("sourcePath", request.getSourcePath())
+ .withParameter("sourceToken", request.getSourceToken())
+ .withParameter("storeType", request.getSourceType())
+ .withParameter("mftAuthorizationToken", request.getMftAuthorizationToken());
+
+ SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(requestBuilder.build());
+
+ switch (rpcResponse.getResponseStatus()) {
+ case SUCCESS:
+ String url = rpcResponse.getResponseAsStr();
+ HttpDownloadApiResponse downloadResponse = HttpDownloadApiResponse.newBuilder()
+ .setUrl(url)
+ .setTargetAgent(request.getTargetAgent()).build();
+ responseObserver.onNext(downloadResponse);
+ responseObserver.onCompleted();
+ return;
+ case FAIL:
+ logger.error("Errored while processing the download request to resource {} in store {}. Error msg : {}",
+ request.getSourcePath(), request.getSourceStoreId(), rpcResponse.getErrorAsStr());
+ responseObserver.onError(new Exception("Errored while processing the the fetch file metadata response. Error msg : " +
+ rpcResponse.getErrorAsStr()));
+ }
+
+ } catch (Exception e) {
+ logger.error("Error while submitting http download request to path {} in store {}",
+ request.getSourcePath(), request.getSourceStoreId() , e);
+ responseObserver.onError(new Exception("Failed to submit http download request", e));
+ }
+ }
+
+ @Override
public void getTransferStates(TransferStateApiRequest request, StreamObserver<TransferStateApiResponse> responseObserver) {
try {
List<TransferState> states = mftConsulClient.getTransferStates(request.getTransferId());
diff --git a/api/stub/src/main/proto/MFTApi.proto b/api/stub/src/main/proto/MFTApi.proto
index dc003ef..891413c 100644
--- a/api/stub/src/main/proto/MFTApi.proto
+++ b/api/stub/src/main/proto/MFTApi.proto
@@ -7,25 +7,55 @@
import "google/protobuf/empty.proto";
message TransferApiRequest {
- string sourceId = 1;
- string sourceType = 2;
- string sourceToken = 3;
- string sourceResourceBackend = 4;
- string sourceCredentialBackend = 5;
- string destinationId = 6;
- string destinationType = 7;
- string destinationToken = 8;
- string destResourceBackend = 9;
- string destCredentialBackend = 10;
- bool affinityTransfer = 11;
- map<string, int32> targetAgents = 12;
- string mftAuthorizationToken = 13;
+ string sourceStorageId = 1;
+ string sourcePath = 2;
+ string sourceType = 3;
+ string sourceToken = 4;
+ string sourceResourceBackend = 5;
+ string sourceCredentialBackend = 6;
+ string destinationStorageId = 7;
+ string destinationPath = 8;
+ string destinationType = 9;
+ string destinationToken = 10;
+ string destResourceBackend = 11;
+ string destCredentialBackend = 12;
+ bool affinityTransfer = 13;
+ map<string, int32> targetAgents = 14;
+ string mftAuthorizationToken = 15;
}
message TransferApiResponse {
string transferId = 1;
}
+message HttpUploadApiRequest {
+ string destinationStoreId = 1;
+ string destinationPath = 2;
+ string destinationToken = 3;
+ string destinationType = 4;
+ string targetAgent = 5;
+ string mftAuthorizationToken = 6;
+}
+
+message HttpUploadApiResponse {
+ string url = 1;
+ string targetAgent = 2;
+}
+
+message HttpDownloadApiRequest {
+ string sourceStoreId = 1;
+ string sourcePath = 2;
+ string sourceToken = 3;
+ string sourceType = 4;
+ string targetAgent = 5;
+ string mftAuthorizationToken = 6;
+}
+
+message HttpDownloadApiResponse {
+ string url = 1;
+ string targetAgent = 2;
+}
+
message TransferStateApiRequest {
string transferId = 1;
string mftAuthorizationToken = 2;
@@ -93,6 +123,18 @@
};
}
+ rpc submitHttpUpload(HttpUploadApiRequest) returns (HttpUploadApiResponse) {
+ option (google.api.http) = {
+ post: "/v1.0/api/http-upload"
+ };
+ }
+
+ rpc submitHttpDownload(HttpDownloadApiRequest) returns (HttpDownloadApiResponse) {
+ option (google.api.http) = {
+ post: "/v1.0/api/http-download"
+ };
+ }
+
rpc getTransferStates(TransferStateApiRequest) returns (stream TransferStateApiResponse) {
option (google.api.http) = {
get: "/v1.0/api/transfer/states"
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
index 0b0f2cd..ec4240c 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -221,12 +221,14 @@
private TransferCommand convertRequestToCommand(String transferId, TransferRequest transferRequest) {
TransferCommand transferCommand = new TransferCommand();
- transferCommand.setSourceId(transferRequest.getSourceId())
+ transferCommand.setSourceStorageId(transferRequest.getSourceStorageId())
+ .setSourcePath(transferRequest.getSourcePath())
.setSourceToken(transferRequest.getSourceToken())
.setSourceType(transferRequest.getSourceType())
.setSourceResourceBackend(transferRequest.getSourceResourceBackend())
.setSourceCredentialBackend(transferRequest.getSourceCredentialBackend())
- .setDestinationId(transferRequest.getDestinationId())
+ .setDestinationStorageId(transferRequest.getDestinationStorageId())
+ .setDestinationPath(transferRequest.getDestinationPath())
.setDestinationToken(transferRequest.getDestinationToken())
.setDestinationType(transferRequest.getDestinationType())
.setDestResourceBackend(transferRequest.getDestResourceBackend())
diff --git a/core/src/main/java/org/apache/airavata/mft/core/DirectoryResourceMetadata.java b/core/src/main/java/org/apache/airavata/mft/core/DirectoryResourceMetadata.java
index 5eb9813..5ff3b5f 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/DirectoryResourceMetadata.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/DirectoryResourceMetadata.java
@@ -26,8 +26,6 @@
private long createdTime;
private long updateTime;
private String resourcePath;
- private String parentResourceId;
- private String parentResourceType;
private List<DirectoryResourceMetadata> directories = new ArrayList<>();
private List<FileResourceMetadata> files = new ArrayList<>();
private boolean lazyInitialized = true;
@@ -68,24 +66,6 @@
return this;
}
- public String getParentResourceId() {
- return parentResourceId;
- }
-
- public DirectoryResourceMetadata setParentResourceId(String parentResourceId) {
- this.parentResourceId = parentResourceId;
- return this;
- }
-
- public String getParentResourceType() {
- return parentResourceType;
- }
-
- public DirectoryResourceMetadata setParentResourceType(String parentResourceType) {
- this.parentResourceType = parentResourceType;
- return this;
- }
-
public List<DirectoryResourceMetadata> getDirectories() {
return directories;
}
@@ -119,8 +99,6 @@
private long createdTime;
private long updateTime;
private String resourcePath;
- private String parentResourceId;
- private String parentResourceType;
private List<DirectoryResourceMetadata> directories = new ArrayList<>();
private List<FileResourceMetadata> files = new ArrayList<>();
private boolean lazyInitialized = true;
@@ -152,16 +130,6 @@
return this;
}
- public Builder withParentResourceId(String parentResourceId) {
- this.parentResourceId = parentResourceId;
- return this;
- }
-
- public Builder withParentResourceType(String parentResourceType) {
- this.parentResourceType = parentResourceType;
- return this;
- }
-
public Builder withDirectories(List<DirectoryResourceMetadata> directories) {
this.directories = directories;
return this;
@@ -193,8 +161,6 @@
directoryResourceMetadata.setCreatedTime(createdTime);
directoryResourceMetadata.setUpdateTime(updateTime);
directoryResourceMetadata.setResourcePath(resourcePath);
- directoryResourceMetadata.setParentResourceId(parentResourceId);
- directoryResourceMetadata.setParentResourceType(parentResourceType);
directoryResourceMetadata.setDirectories(directories);
directoryResourceMetadata.setFiles(files);
directoryResourceMetadata.setLazyInitialized(lazyInitialized);
diff --git a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
index 0b2372e..d61d743 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
@@ -25,8 +25,6 @@
private long updateTime;
private String md5sum;
private String resourcePath;
- private String parentResourceId;
- private String parentResourceType;
public String getFriendlyName() {
return friendlyName;
@@ -82,24 +80,6 @@
return this;
}
- public String getParentResourceId() {
- return parentResourceId;
- }
-
- public FileResourceMetadata setParentResourceId(String parentResourceId) {
- this.parentResourceId = parentResourceId;
- return this;
- }
-
- public String getParentResourceType() {
- return parentResourceType;
- }
-
- public FileResourceMetadata setParentResourceType(String parentResourceType) {
- this.parentResourceType = parentResourceType;
- return this;
- }
-
public static final class Builder {
private String friendlyName;
private long resourceSize;
@@ -107,8 +87,6 @@
private long updateTime;
private String md5sum;
private String resourcePath;
- private String parentResourceId;
- private String parentResourceType;
private Builder() {
}
@@ -147,15 +125,6 @@
return this;
}
- public Builder withParentResourceId(String parentResourceId) {
- this.parentResourceId = parentResourceId;
- return this;
- }
-
- public Builder withParentResourceType(String parentResourceType) {
- this.parentResourceType = parentResourceType;
- return this;
- }
public FileResourceMetadata build() {
FileResourceMetadata fileResourceMetadata = new FileResourceMetadata();
@@ -165,8 +134,6 @@
fileResourceMetadata.setUpdateTime(updateTime);
fileResourceMetadata.setMd5sum(md5sum);
fileResourceMetadata.setResourcePath(resourcePath);
- fileResourceMetadata.setParentResourceId(parentResourceId);
- fileResourceMetadata.setParentResourceType(parentResourceType);
return fileResourceMetadata;
}
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java b/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
index b90f4cc..860e9f7 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
@@ -17,7 +17,6 @@
package org.apache.airavata.mft.core;
-import org.apache.airavata.mft.core.ConnectorContext;
import org.apache.airavata.mft.core.api.Connector;
import java.util.concurrent.Callable;
@@ -26,15 +25,17 @@
private Connector connector;
private ConnectorContext context;
+ private String resourcePath;
- public TransferTask(Connector connector, ConnectorContext context) {
+ public TransferTask(Connector connector, ConnectorContext context, String resourcePath) {
this.connector = connector;
this.context = context;
+ this.resourcePath = resourcePath;
}
@Override
public Integer call() throws Exception {
- this.connector.startStream(context);
+ this.connector.startStream(resourcePath, context);
return 0;
}
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
index 772d90e..8264d58 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
@@ -20,8 +20,8 @@
import org.apache.airavata.mft.core.ConnectorContext;
public interface Connector {
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
String secretServiceHost, int secretServicePort) throws Exception;
public void destroy();
- void startStream(ConnectorContext context) throws Exception;
+ void startStream(String targetPath, ConnectorContext context) throws Exception;
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java b/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java
index e784867..38a5529 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java
@@ -43,17 +43,15 @@
public FileResourceMetadata getFileResourceMetadata(String resourceId, String credentialToken) throws Exception;
/**
- * Fetches a metadata of given File Resource inside a registered directory resource. Target file might be living in
- * multiple level below the parent directory
+ * Fetches a metadata of given File Resource
*
- * @param parentResourceId parent directory resource id
- * @param resourcePath path of the target resource. This should be a child path of the parent resource
+ * @param storageId id of the storage resource
+ * @param resourcePath resource path
* @param credentialToken credential token for the resource
* @return an object of {@link FileResourceMetadata}
- * @throws Exception if the parent resource is not a Directory resource or the target resource is not a File Resource type
- * or the resource can't be fetched from the resource service
+ * @throws Exception if the resource id is not a File Resource type or the resource can't be fetched from the resource service
*/
- public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception;
+ public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception;
/**
* Fetches a metadata of given Directory Resource
@@ -66,17 +64,14 @@
public DirectoryResourceMetadata getDirectoryResourceMetadata(String resourceId, String credentialToken) throws Exception;
/**
- * Fetches a metadata of given Directory Resource inside a registered directory resource. Target directory might be living in
- * multiple level below the parent directory
+ * Fetches a metadata of given Directory Resource
*
- * @param parentResourceId parent directory resource id
- * @param resourcePath path of the target resource. This should be a child path of the parent resource
- * @param credentialToken credential token for the resource
+ * @param storageId id of the storage resource
+ * @param resourcePath resource path
* @return an object of {@link DirectoryResourceMetadata}
- * @throws Exception if the parent resource is not a Directory resource or the target resource is not a Directory Resource type
- * or the resource can't be fetched from the resource service
+ * @throws Exception if the resource id is not a Directory Resource type or the resource can't be fetched from the resource service
*/
- public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception;
+ public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception;
/**
* Check whether the resource is available in the actual storage
@@ -87,4 +82,15 @@
* @throws Exception if the resource details can not be fetched from the resource service
*/
public Boolean isAvailable(String resourceId, String credentialToken) throws Exception;
+
+ /**
+ * Check whether the resource is available in the actual storage
+ *
+ * @param storageId id of the storage
+ * @param resourcePath resource path
+ * @param credentialToken credential token for the resource
+ * @return true of the resource is available false otherwise
+ * @throws Exception if the resource details can not be fetched from the resource service
+ */
+ public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception;
}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
new file mode 100644
index 0000000..93b5d83
--- /dev/null
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.examples.http;
+
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.HttpDownloadApiRequest;
+import org.apache.airavata.mft.api.service.HttpDownloadApiResponse;
+import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
+
+public class DownloadExample {
+ public static void main(String args[]) {
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+ HttpDownloadApiResponse httpDownloadApiResponse = client.submitHttpDownload(HttpDownloadApiRequest.newBuilder()
+ .setTargetAgent("agent0")
+ .setSourcePath("/tmp/a.txt")
+ .setSourceStoreId("remote-ssh-storage")
+ .setSourceToken("local-ssh-cred")
+ .setSourceType("SCP")
+ .setMftAuthorizationToken("")
+ .build());
+
+ System.out.println(httpDownloadApiResponse);
+ }
+}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
index 53bfa71..af73d7c 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
@@ -26,18 +26,22 @@
public static void main(String args[]) throws Exception {
MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
- String sourceId = "remote-ssh-resource2";
+ String sourceStorageId = "remote-ssh-storage";
+ String sourceResourcePath = "/tmp/1mb.txt";
String sourceToken = "local-ssh-cred";
- String destId = "10mb-file";
+ String destStorageId = "local-storage-1";
+ String destResourcePath = "/tmp/1mb-copy.txt";
String destToken = "";
String mftAuthorizationToken = "43ff79ac-e4f2-473c-9ea1-04eee9509a53";
TransferApiRequest request = TransferApiRequest.newBuilder()
.setMftAuthorizationToken(mftAuthorizationToken)
- .setSourceId(sourceId)
+ .setSourceStorageId(sourceStorageId)
+ .setSourcePath(sourceResourcePath)
.setSourceToken(sourceToken)
.setSourceType("SCP")
- .setDestinationId(destId)
+ .setDestinationStorageId(destStorageId)
+ .setDestinationPath(destResourcePath)
.setDestinationToken(destToken)
.setDestinationType("LOCAL")
.setAffinityTransfer(false).build();
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
index 8443cf2..9df81e2 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
@@ -26,18 +26,22 @@
public static void main(String args[]) throws Exception {
MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
- String sourceId = "remote-ssh-resource2";
- String sourceToken = "local-ssh-cred";
- String destId = "s3-file";
+ String sourceStorageId = "remote-ssh-storage";
+ String sourceResourcePath = "/tmp/1mb.txt";
+ String sourceToken = "ssh-cred";
+ String destStorageId = "s3-storage-1";
+ String destResourcePath = "1mb-copy.txt";
String destToken = "s3-cred";
String mftAuthorizationToken = "43ff79ac-e4f2-473c-9ea1-04eee9509a53";
TransferApiRequest request = TransferApiRequest.newBuilder()
.setMftAuthorizationToken(mftAuthorizationToken)
- .setSourceId(sourceId)
+ .setSourceStorageId(sourceStorageId)
+ .setSourcePath(sourceResourcePath)
.setSourceToken(sourceToken)
.setSourceType("SCP")
- .setDestinationId(destId)
+ .setDestinationStorageId(destStorageId)
+ .setDestinationPath(destResourcePath)
.setDestinationToken(destToken)
.setDestinationType("S3")
.setAffinityTransfer(false).build();
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
index 5977598..72ffac4 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
@@ -26,18 +26,22 @@
public static void main(String args[]) throws Exception {
MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
- String sourceId = "remote-ssh-resource2";
- String sourceToken = "local-ssh-cred";
- String destId = "remote-ssh-resource";
- String destToken = "local-ssh-cred";
+ String sourceStorageId = "remote-ssh-storage-1";
+ String sourceResourcePath = "/tmp/1mb.txt";
+ String sourceToken = "ssh-cred-1";
+ String destStorageId = "remote-ssh-storage-2";
+ String destResourcePath = "/tmp/1mb-copy.txt";
+ String destToken = "ssh-cred-2";
String mftAuthorizationToken = "43ff79ac-e4f2-473c-9ea1-04eee9509a53";
TransferApiRequest request = TransferApiRequest.newBuilder()
.setMftAuthorizationToken(mftAuthorizationToken)
- .setSourceId(sourceId)
+ .setSourceStorageId(sourceStorageId)
+ .setSourcePath(sourceResourcePath)
.setSourceToken(sourceToken)
.setSourceType("SCP")
- .setDestinationId(destId)
+ .setDestinationStorageId(destStorageId)
+ .setDestinationPath(destResourcePath)
.setDestinationToken(destToken)
.setDestinationType("SCP")
.setAffinityTransfer(false).build();
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
index 2e3a550..d924995 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
@@ -18,13 +18,19 @@
package org.apache.airavata.mft.resource.server.backend;
import org.apache.airavata.mft.resource.stubs.azure.resource.*;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
import org.apache.airavata.mft.resource.stubs.local.resource.*;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
import org.apache.airavata.mft.resource.stubs.s3.resource.*;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
import org.apache.airavata.mft.resource.stubs.scp.resource.*;
import org.apache.airavata.mft.resource.stubs.scp.storage.*;
@@ -45,31 +51,61 @@
public boolean updateSCPResource(SCPResourceUpdateRequest request) throws Exception;
public boolean deleteSCPResource(SCPResourceDeleteRequest request) throws Exception;
+ public Optional<LocalStorage> getLocalStorage(LocalStorageGetRequest request) throws Exception;
+ public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception;
+ public boolean updateLocalStorage(LocalStorageUpdateRequest request) throws Exception;
+ public boolean deleteLocalStorage(LocalStorageDeleteRequest request) throws Exception;
+
public Optional<LocalResource> getLocalResource(LocalResourceGetRequest request) throws Exception;
public LocalResource createLocalResource(LocalResourceCreateRequest request) throws Exception;
public boolean updateLocalResource(LocalResourceUpdateRequest request) throws Exception;
public boolean deleteLocalResource(LocalResourceDeleteRequest request) throws Exception;
+ public Optional<S3Storage> getS3Storage(S3StorageGetRequest request) throws Exception;
+ public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception;
+ public boolean updateS3Storage(S3StorageUpdateRequest request) throws Exception;
+ public boolean deleteS3Storage(S3StorageDeleteRequest request) throws Exception;
+
public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception;
public S3Resource createS3Resource(S3ResourceCreateRequest request) throws Exception;
public boolean updateS3Resource(S3ResourceUpdateRequest request) throws Exception;
public boolean deleteS3Resource(S3ResourceDeleteRequest request) throws Exception;
+ public Optional<BoxStorage> getBoxStorage(BoxStorageGetRequest request) throws Exception;
+ public BoxStorage createBoxStorage(BoxStorageCreateRequest request) throws Exception;
+ public boolean updateBoxStorage(BoxStorageUpdateRequest request) throws Exception;
+ public boolean deleteBoxStorage(BoxStorageDeleteRequest request) throws Exception;
+
public Optional<BoxResource> getBoxResource(BoxResourceGetRequest request) throws Exception;
public BoxResource createBoxResource(BoxResourceCreateRequest request) throws Exception;
public boolean updateBoxResource(BoxResourceUpdateRequest request) throws Exception;
public boolean deleteBoxResource(BoxResourceDeleteRequest request) throws Exception;
+ public Optional<AzureStorage> getAzureStorage(AzureStorageGetRequest request) throws Exception;
+ public AzureStorage createAzureStorage(AzureStorageCreateRequest request) throws Exception;
+ public boolean updateAzureStorage(AzureStorageUpdateRequest request) throws Exception;
+ public boolean deleteAzureStorage(AzureStorageDeleteRequest request) throws Exception;
+
public Optional<AzureResource> getAzureResource(AzureResourceGetRequest request) throws Exception;
public AzureResource createAzureResource(AzureResourceCreateRequest request) throws Exception;
public boolean updateAzureResource(AzureResourceUpdateRequest request) throws Exception;
public boolean deleteAzureResource(AzureResourceDeleteRequest request) throws Exception;
+ public Optional<GCSStorage> getGCSStorage(GCSStorageGetRequest request) throws Exception;
+ public GCSStorage createGCSStorage(GCSStorageCreateRequest request) throws Exception;
+ public boolean updateGCSStorage(GCSStorageUpdateRequest request) throws Exception;
+ public boolean deleteGCSStorage(GCSStorageDeleteRequest request) throws Exception;
+
public Optional<GCSResource> getGCSResource(GCSResourceGetRequest request) throws Exception;
public GCSResource createGCSResource(GCSResourceCreateRequest request) throws Exception;
public boolean updateGCSResource(GCSResourceUpdateRequest request) throws Exception;
public boolean deleteGCSResource(GCSResourceDeleteRequest request) throws Exception;
+ public Optional<DropboxStorage> getDropboxStorage(DropboxStorageGetRequest request) throws Exception;
+ public DropboxStorage createDropboxStorage(DropboxStorageCreateRequest request) throws Exception;
+ public boolean updateDropboxStorage(DropboxStorageUpdateRequest request) throws Exception;
+ public boolean deleteDropboxStorage(DropboxStorageDeleteRequest request) throws Exception;
+
public Optional<DropboxResource> getDropboxResource(DropboxResourceGetRequest request) throws Exception;
public DropboxResource createDropboxResource(DropboxResourceCreateRequest request) throws Exception;
public boolean updateDropboxResource(DropboxResourceUpdateRequest request) throws Exception;
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
index 6299a07..a5a78f1 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
@@ -19,14 +19,20 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.stubs.azure.resource.*;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
import org.apache.airavata.mft.resource.stubs.local.resource.*;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
import org.apache.airavata.mft.resource.stubs.s3.resource.*;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
import org.apache.airavata.mft.resource.stubs.scp.resource.*;
import org.apache.airavata.mft.resource.stubs.scp.storage.*;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -155,6 +161,26 @@
}
@Override
+ public Optional<LocalStorage> getLocalStorage(LocalStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateLocalStorage(LocalStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteLocalStorage(LocalStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<LocalResource> getLocalResource(LocalResourceGetRequest request) {
throw new UnsupportedOperationException("Operation is not supported in backend");
@@ -179,6 +205,26 @@
}
@Override
+ public Optional<S3Storage> getS3Storage(S3StorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateS3Storage(S3StorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteS3Storage(S3StorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
@@ -203,6 +249,26 @@
}
@Override
+ public Optional<BoxStorage> getBoxStorage(BoxStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public BoxStorage createBoxStorage(BoxStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateBoxStorage(BoxStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteBoxStorage(BoxStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<BoxResource> getBoxResource(BoxResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
@@ -223,6 +289,26 @@
}
@Override
+ public Optional<AzureStorage> getAzureStorage(AzureStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public AzureStorage createAzureStorage(AzureStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateAzureStorage(AzureStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteAzureStorage(AzureStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<AzureResource> getAzureResource(AzureResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
@@ -243,6 +329,26 @@
}
@Override
+ public Optional<GCSStorage> getGCSStorage(GCSStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public GCSStorage createGCSStorage(GCSStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateGCSStorage(GCSStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteGCSStorage(GCSStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<GCSResource> getGCSResource(GCSResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
@@ -261,6 +367,27 @@
public boolean deleteGCSResource(GCSResourceDeleteRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
+
+ @Override
+ public Optional<DropboxStorage> getDropboxStorage(DropboxStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public DropboxStorage createDropboxStorage(DropboxStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateDropboxStorage(DropboxStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteDropboxStorage(DropboxStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
@Override
public Optional<DropboxResource> getDropboxResource(DropboxResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
index 3841409..eace41a 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
@@ -19,21 +19,21 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.stubs.azure.resource.*;
-import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
import org.apache.airavata.mft.resource.stubs.box.resource.*;
-import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorage;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
import org.apache.airavata.mft.resource.stubs.common.DirectoryResource;
import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
-import org.apache.airavata.mft.resource.stubs.dropbox.storage.DropboxStorage;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
-import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
import org.apache.airavata.mft.resource.stubs.local.resource.*;
-import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorage;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
import org.apache.airavata.mft.resource.stubs.s3.resource.*;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
import org.apache.airavata.mft.resource.stubs.scp.resource.*;
import org.apache.airavata.mft.resource.stubs.scp.storage.*;
import org.json.simple.JSONArray;
@@ -57,6 +57,9 @@
@org.springframework.beans.factory.annotation.Value("${file.backend.resource.file}")
private String resourceFile;
+ @org.springframework.beans.factory.annotation.Value("${file.backend.storage.file}")
+ private String storageFile;
+
@Override
public void init() {
logger.info("Initializing file based resource backend");
@@ -69,7 +72,33 @@
@Override
public Optional<SCPStorage> getSCPStorage(SCPStorageGetRequest request) throws Exception {
- throw new UnsupportedOperationException("Operation is not supported in backend");
+ InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+ JSONParser jsonParser = new JSONParser();
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray resourceList = (JSONArray) obj;
+
+ List<SCPStorage> scpStorages = (List<SCPStorage>) resourceList.stream()
+ .filter(resource -> "SCP".equals(((JSONObject) resource).get("type").toString()))
+ .map(st -> {
+ JSONObject s = (JSONObject) st;
+
+ SCPStorage storage = SCPStorage.newBuilder()
+ .setStorageId(s.get("storageId").toString())
+ .setHost(s.get("host").toString())
+ .setUser(s.get("user").toString())
+ .setPort(Integer.parseInt(s.get("port").toString())).build();
+
+ return storage;
+
+ }).collect(Collectors.toList());
+
+ return scpStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+ }
}
@Override
@@ -108,15 +137,9 @@
.map(resource -> {
JSONObject r = (JSONObject) resource;
- SCPStorage storage = SCPStorage.newBuilder()
- .setStorageId(((JSONObject)r.get("scpStorage")).get("storageId").toString())
- .setHost(((JSONObject)r.get("scpStorage")).get("host").toString())
- .setUser(((JSONObject)r.get("scpStorage")).get("user").toString())
- .setPort(Integer.parseInt(((JSONObject)r.get("scpStorage")).get("port").toString())).build();
-
SCPResource.Builder builder = SCPResource.newBuilder()
.setResourceId(r.get("resourceId").toString())
- .setScpStorage(storage);
+ .setScpStorage(SCPStorage.newBuilder().setStorageId(r.get("storageId").toString()).getDefaultInstanceForType());
switch (r.get("resourceMode").toString()) {
case "FILE":
@@ -156,6 +179,47 @@
}
@Override
+ public Optional<LocalStorage> getLocalStorage(LocalStorageGetRequest request) throws Exception {
+ JSONParser jsonParser = new JSONParser();
+ InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray storageList = (JSONArray) obj;
+
+ List<LocalStorage> localStorages = (List<LocalStorage>) storageList.stream()
+ .filter(storage -> "LOCAL".equals(((JSONObject) storage).get("type").toString()))
+ .map( storage -> {
+ JSONObject s = (JSONObject) storage;
+
+ LocalStorage st = LocalStorage.newBuilder()
+ .setStorageId(s.get("storageId").toString())
+ .setAgentId(s.get("agentId").toString())
+ .build();
+
+ return st;
+ }).collect(Collectors.toList());
+ return localStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+ }
+ }
+
+ @Override
+ public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean updateLocalStorage(LocalStorageUpdateRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
+ public boolean deleteLocalStorage(LocalStorageDeleteRequest request) throws Exception {
+ return false;
+ }
+
+ @Override
public Optional<LocalResource> getLocalResource(LocalResourceGetRequest request) throws Exception {
JSONParser jsonParser = new JSONParser();
InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -214,6 +278,48 @@
}
@Override
+ public Optional<S3Storage> getS3Storage(S3StorageGetRequest request) throws Exception {
+ JSONParser jsonParser = new JSONParser();
+ InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray storagesList = (JSONArray) obj;
+
+ List<S3Storage> s3Storages = (List<S3Storage>) storagesList.stream()
+ .filter(storage -> "S3".equals(((JSONObject) storage).get("type").toString()))
+ .map(storage -> {
+ JSONObject s = (JSONObject) storage;
+
+ S3Storage st = S3Storage.newBuilder()
+ .setStorageId(s.get("storageId").toString())
+ .setRegion(s.get("region").toString())
+ .setRegion(s.get("bucketName").toString())
+ .build();
+
+ return st;
+ }).collect(Collectors.toList());
+ return s3Storages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+ }
+ }
+
+ @Override
+ public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateS3Storage(S3StorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteS3Storage(S3StorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception {
JSONParser jsonParser = new JSONParser();
InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -274,6 +380,47 @@
}
@Override
+ public Optional<BoxStorage> getBoxStorage(BoxStorageGetRequest request) throws Exception {
+ JSONParser jsonParser = new JSONParser();
+ InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray storageList = (JSONArray) obj;
+
+ System.out.println("All resources ");
+ List<BoxStorage> boxStorages = (List<BoxStorage>) storageList.stream()
+ .filter(storage -> "BOX".equals(((JSONObject) storage).get("type").toString()))
+ .map(storage -> {
+ JSONObject s = (JSONObject) storage;
+
+ BoxStorage st = BoxStorage.newBuilder()
+ .setStorageId(s.get("storageId").toString())
+ .build();
+
+ return st;
+ }).collect(Collectors.toList());
+ return boxStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+ }
+ }
+
+ @Override
+ public BoxStorage createBoxStorage(BoxStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateBoxStorage(BoxStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteBoxStorage(BoxStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<BoxResource> getBoxResource(BoxResourceGetRequest request) throws Exception {
JSONParser jsonParser = new JSONParser();
InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -330,6 +477,47 @@
}
@Override
+ public Optional<AzureStorage> getAzureStorage(AzureStorageGetRequest request) throws Exception {
+ JSONParser jsonParser = new JSONParser();
+ InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray storageList = (JSONArray) obj;
+
+ List<AzureStorage> azureStorages = (List<AzureStorage>) storageList.stream()
+ .filter(storage -> "AZURE".equals(((JSONObject) storage).get("type").toString()))
+ .map(storage -> {
+ JSONObject s = (JSONObject) storage;
+
+ AzureStorage st = AzureStorage.newBuilder()
+ .setStorageId(s.get("storageId").toString())
+ .setContainer(s.get("container").toString())
+ .build();
+
+ return st;
+ }).collect(Collectors.toList());
+ return azureStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+ }
+ }
+
+ @Override
+ public AzureStorage createAzureStorage(AzureStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateAzureStorage(AzureStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteAzureStorage(AzureStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<AzureResource> getAzureResource(AzureResourceGetRequest request) throws Exception {
JSONParser jsonParser = new JSONParser();
InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -386,6 +574,47 @@
}
@Override
+ public Optional<GCSStorage> getGCSStorage(GCSStorageGetRequest request) throws Exception {
+ JSONParser jsonParser = new JSONParser();
+ InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray storageList = (JSONArray) obj;
+
+ List<GCSStorage> gcsStorages = (List<GCSStorage>) storageList.stream()
+ .filter(storage -> "GCS".equals(((JSONObject) storage).get("type").toString()))
+ .map(storage -> {
+ JSONObject s = (JSONObject) storage;
+
+ GCSStorage st = GCSStorage.newBuilder()
+ .setStorageId(s.get("storageId").toString())
+ .setBucketName(s.get("bucketName").toString())
+ .build();
+
+ return st;
+ }).collect(Collectors.toList());
+ return gcsStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+ }
+ }
+
+ @Override
+ public GCSStorage createGCSStorage(GCSStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateGCSStorage(GCSStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteGCSStorage(GCSStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<GCSResource> getGCSResource(GCSResourceGetRequest request) throws Exception {
JSONParser jsonParser = new JSONParser();
InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -440,6 +669,64 @@
public boolean deleteGCSResource(GCSResourceDeleteRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
+
+ @Override
+ public Optional<DropboxStorage> getDropboxStorage(DropboxStorageGetRequest request) throws Exception {
+ JSONParser jsonParser = new JSONParser();
+ InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray storageList = (JSONArray) obj;
+
+ List<DropboxStorage> dropboxStorages = (List<DropboxStorage>) storageList.stream()
+ .filter(resource -> "DROPBOX".equals(((JSONObject) resource).get("type").toString()))
+ .map(resource -> {
+ JSONObject r = (JSONObject) resource;
+ String resourcePath = r.get("resourcePath").toString();
+ resourcePath = resourcePath.startsWith("/") ? resourcePath : "/" + resourcePath;
+
+ DropboxStorage storage = DropboxStorage.newBuilder()
+ .setStorageId(((JSONObject)r.get("dropboxStorage")).get("storageId").toString())
+ .build();
+
+ DropboxResource.Builder builder = DropboxResource.newBuilder()
+ .setResourceId(r.get("resourceId").toString())
+ .setDropboxStorage(storage);
+
+ switch (r.get("resourceMode").toString()) {
+ case "FILE":
+ FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setFile(fileResource);
+ break;
+ case "DIRECTORY":
+ DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setDirectory(directoryResource);
+ break;
+ }
+ return builder.build();
+
+ }).collect(Collectors.toList());
+ return dropboxStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+ }
+ }
+
+ @Override
+ public DropboxStorage createDropboxStorage(DropboxStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateDropboxStorage(DropboxStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteDropboxStorage(DropboxStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
@Override
public Optional<DropboxResource> getDropboxResource(DropboxResourceGetRequest request) throws Exception {
JSONParser jsonParser = new JSONParser();
@@ -498,6 +785,55 @@
}
@Override
+ public Optional<FTPStorage> getFTPStorage(FTPStorageGetRequest request) throws Exception {
+ InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+ JSONParser jsonParser = new JSONParser();
+
+ if (inputStream == null) {
+ throw new IOException("resources file not found");
+ }
+
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+
+ Object obj = jsonParser.parse(reader);
+
+ JSONArray storageList = (JSONArray) obj;
+
+ List<FTPStorage> ftpStorages = (List<FTPStorage>) storageList.stream()
+ .filter(storage -> "FTP".equals(((JSONObject) storage).get("type").toString()))
+ .map(storage -> {
+ JSONObject s = (JSONObject) storage;
+
+ FTPStorage st = FTPStorage.newBuilder()
+ .setStorageId(s.get("storageId").toString())
+ .setHost(s.get("host").toString())
+ .setPort(Integer.parseInt(s.get("port").toString())).build();
+
+ return st;
+
+ }).collect(Collectors.toList());
+
+ return ftpStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+ }
+ }
+
+ @Override
+ public FTPStorage createFTPStorage(FTPStorageCreateRequest request) {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateFTPStorage(FTPStorageUpdateRequest request) {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteFTPStorage(FTPStorageDeleteRequest request) {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<FTPResource> getFTPResource(FTPResourceGetRequest request) throws Exception {
InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -559,24 +895,4 @@
public boolean deleteFTPResource(FTPResourceDeleteRequest request) {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
-
- @Override
- public Optional<FTPStorage> getFTPStorage(FTPStorageGetRequest request) {
- throw new UnsupportedOperationException("Operation is not supported in backend");
- }
-
- @Override
- public FTPStorage createFTPStorage(FTPStorageCreateRequest request) {
- throw new UnsupportedOperationException("Operation is not supported in backend");
- }
-
- @Override
- public boolean updateFTPStorage(FTPStorageUpdateRequest request) {
- throw new UnsupportedOperationException("Operation is not supported in backend");
- }
-
- @Override
- public boolean deleteFTPStorage(FTPStorageDeleteRequest request) {
- throw new UnsupportedOperationException("Operation is not supported in backend");
- }
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
index a070b33..caf5057 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
@@ -17,18 +17,23 @@
package org.apache.airavata.mft.resource.server.backend.sql;
-
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.server.backend.sql.entity.*;
import org.apache.airavata.mft.resource.server.backend.sql.repository.*;
import org.apache.airavata.mft.resource.stubs.azure.resource.*;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
import org.apache.airavata.mft.resource.stubs.local.resource.*;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
import org.apache.airavata.mft.resource.stubs.s3.resource.*;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
import org.apache.airavata.mft.resource.stubs.scp.resource.*;
import org.apache.airavata.mft.resource.stubs.scp.storage.*;
import org.dozer.DozerBeanMapper;
@@ -121,6 +126,26 @@
}
@Override
+ public Optional<LocalStorage> getLocalStorage(LocalStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateLocalStorage(LocalStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteLocalStorage(LocalStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<LocalResource> getLocalResource(LocalResourceGetRequest request) {
Optional<LocalResourceEntity> resourceEntity = localResourceRepository.findByResourceId(request.getResourceId());
return resourceEntity.map(scpResourceEntity -> mapper.map(scpResourceEntity, LocalResource.newBuilder().getClass()).build());
@@ -145,6 +170,26 @@
}
@Override
+ public Optional<S3Storage> getS3Storage(S3StorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateS3Storage(S3StorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteS3Storage(S3StorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
@@ -169,6 +214,26 @@
}
@Override
+ public Optional<BoxStorage> getBoxStorage(BoxStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public BoxStorage createBoxStorage(BoxStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateBoxStorage(BoxStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteBoxStorage(BoxStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<BoxResource> getBoxResource(BoxResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
@@ -189,6 +254,26 @@
}
@Override
+ public Optional<AzureStorage> getAzureStorage(AzureStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public AzureStorage createAzureStorage(AzureStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateAzureStorage(AzureStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteAzureStorage(AzureStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<AzureResource> getAzureResource(AzureResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
@@ -209,6 +294,26 @@
}
@Override
+ public Optional<GCSStorage> getGCSStorage(GCSStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public GCSStorage createGCSStorage(GCSStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateGCSStorage(GCSStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteGCSStorage(GCSStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<GCSResource> getGCSResource(GCSResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
@@ -229,6 +334,26 @@
}
@Override
+ public Optional<DropboxStorage> getDropboxStorage(DropboxStorageGetRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public DropboxStorage createDropboxStorage(DropboxStorageCreateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean updateDropboxStorage(DropboxStorageUpdateRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
+ public boolean deleteDropboxStorage(DropboxStorageDeleteRequest request) throws Exception {
+ throw new UnsupportedOperationException("Operation is not supported in backend");
+ }
+
+ @Override
public Optional<DropboxResource> getDropboxResource(DropboxResourceGetRequest request) throws Exception {
throw new UnsupportedOperationException("Operation is not supported in backend");
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/AzureServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/AzureServiceHandler.java
index 1175ff2..6808096 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/AzureServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/AzureServiceHandler.java
@@ -23,6 +23,7 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.service.azure.AzureResourceServiceGrpc;
import org.apache.airavata.mft.resource.stubs.azure.resource.*;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,4 +103,69 @@
}
}
+ @Override
+ public void getAzureStorage(AzureStorageGetRequest request, StreamObserver<AzureStorage> responseObserver) {
+ try {
+ this.backend.getAzureStorage(request).ifPresentOrElse(resource -> {
+ responseObserver.onNext(resource);
+ responseObserver.onCompleted();
+ }, () -> {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No Azure storage with id " + request.getStorageId())
+ .asRuntimeException());
+ });
+ } catch (Exception e) {
+ logger.error("Failed in retrieving Azure resource with id {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving Azure resource with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void createAzureStorage(AzureStorageCreateRequest request, StreamObserver<AzureStorage> responseObserver) {
+ try {
+ responseObserver.onNext(this.backend.createAzureStorage(request));
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in creating the Azure storage", e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the Azure resource")
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void updateAzureStorage(AzureStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ this.backend.updateAzureStorage(request);
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in updating the Azure storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the Azure storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteAzureStorage(AzureStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ boolean res = this.backend.deleteAzureStorage(request);
+ if (res) {
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(new Exception("Failed to delete Azure storage with id " + request.getStorageId()));
+ }
+ } catch (Exception e) {
+ logger.error("Failed in deleting the Azure storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the Azure storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/BoxServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/BoxServiceHandler.java
index a17422c..5279d0a 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/BoxServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/BoxServiceHandler.java
@@ -23,6 +23,7 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.service.box.BoxResourceServiceGrpc;
import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +68,8 @@
responseObserver.onError(Status.INTERNAL.withCause(e)
.withDescription("Failed in creating the Box resource")
.asRuntimeException());
- } }
+ }
+ }
@Override
public void updateBoxResource(BoxResourceUpdateRequest request, StreamObserver<Empty> responseObserver) {
@@ -80,7 +82,8 @@
responseObserver.onError(Status.INTERNAL.withCause(e)
.withDescription("Failed in updating the Box resource with id " + request.getResourceId())
.asRuntimeException());
- } }
+ }
+ }
@Override
public void deleteBoxResource(BoxResourceDeleteRequest request, StreamObserver<Empty> responseObserver) {
@@ -99,4 +102,70 @@
.asRuntimeException());
}
}
+
+ @Override
+ public void getBoxStorage(BoxStorageGetRequest request, StreamObserver<BoxStorage> responseObserver) {
+ try {
+ this.backend.getBoxStorage(request).ifPresentOrElse(resource -> {
+ responseObserver.onNext(resource);
+ responseObserver.onCompleted();
+ }, () -> {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No Box storage with id " + request.getStorageId())
+ .asRuntimeException());
+ });
+ } catch (Exception e) {
+ logger.error("Failed in retrieving Box resource with id {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving Box storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void createBoxStorage(BoxStorageCreateRequest request, StreamObserver<BoxStorage> responseObserver) {
+ try {
+ responseObserver.onNext(this.backend.createBoxStorage(request));
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in creating the Box storage", e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the Box storage")
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void updateBoxStorage(BoxStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ this.backend.updateBoxStorage(request);
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in updating the Box storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the Box storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteBoxStorage(BoxStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ boolean res = this.backend.deleteBoxStorage(request);
+ if (res) {
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(new Exception("Failed to delete Box storage with id " + request.getStorageId()));
+ }
+ } catch (Exception e) {
+ logger.error("Failed in deleting the Box storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the Box storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/DropboxServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/DropboxServiceHandler.java
index acdd1cc..0435361 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/DropboxServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/DropboxServiceHandler.java
@@ -23,6 +23,7 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.service.dropbox.DropboxServiceGrpc;
import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,4 +102,70 @@
.asRuntimeException());
}
}
+
+ @Override
+ public void getDropboxStorage(DropboxStorageGetRequest request, StreamObserver<DropboxStorage> responseObserver) {
+ try {
+ this.backend.getDropboxStorage(request).ifPresentOrElse(resource -> {
+ responseObserver.onNext(resource);
+ responseObserver.onCompleted();
+ }, () -> {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No dropbox storage with id " + request.getStorageId())
+ .asRuntimeException());
+ });
+ } catch (Exception e) {
+ logger.error("Failed in retrieving dropbox storage with id {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving dropbox storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void createDropboxStorage(DropboxStorageCreateRequest request, StreamObserver<DropboxStorage> responseObserver) {
+ try {
+ responseObserver.onNext(this.backend.createDropboxStorage(request));
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in creating the dropbox storage", e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the dropbox storage")
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void updateDropboxStorage(DropboxStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ this.backend.updateDropboxStorage(request);
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in updating the dropbox storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the dropbox storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteDropboxStorage(DropboxStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ boolean res = this.backend.deleteDropboxStorage(request);
+ if (res) {
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(new Exception("Failed to delete dropbox storage with id " + request.getStorageId()));
+ }
+ } catch (Exception e) {
+ logger.error("Failed in deleting the dropbox storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the dropbox storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GCSServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GCSServiceHandler.java
index 300e0e2..474ea62 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GCSServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GCSServiceHandler.java
@@ -23,6 +23,7 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.service.gcs.GCSResourceServiceGrpc;
import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,4 +102,70 @@
.asRuntimeException());
}
}
+
+ @Override
+ public void getGCSStorage(GCSStorageGetRequest request, StreamObserver<GCSStorage> responseObserver) {
+ try {
+ this.backend.getGCSStorage(request).ifPresentOrElse(resource -> {
+ responseObserver.onNext(resource);
+ responseObserver.onCompleted();
+ }, () -> {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No GCS storage with id " + request.getStorageId())
+ .asRuntimeException());
+ });
+ } catch (Exception e) {
+ logger.error("Failed in retrieving GCS storage with id {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving GCS storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void createGCSStorage(GCSStorageCreateRequest request, StreamObserver<GCSStorage> responseObserver) {
+ try {
+ responseObserver.onNext(this.backend.createGCSStorage(request));
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in creating the GCS storage", e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the GCS storage")
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void updateGCSStorage(GCSStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ this.backend.updateGCSStorage(request);
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in updating the GCS storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the GCS storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteGCSStorage(GCSStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ boolean res = this.backend.deleteGCSStorage(request);
+ if (res) {
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(new Exception("Failed to delete GCS storage with id " + request.getStorageId()));
+ }
+ } catch (Exception e) {
+ logger.error("Failed in deleting the GCS storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the GCS storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/LocalServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/LocalServiceHandler.java
index 212bd24..122ac03 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/LocalServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/LocalServiceHandler.java
@@ -23,6 +23,7 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.service.local.LocalResourceServiceGrpc;
import org.apache.airavata.mft.resource.stubs.local.resource.*;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,4 +104,69 @@
}
}
+ @Override
+ public void getLocalStorage(LocalStorageGetRequest request, StreamObserver<LocalStorage> responseObserver) {
+ try {
+ this.backend.getLocalStorage(request).ifPresentOrElse(resource -> {
+ responseObserver.onNext(resource);
+ responseObserver.onCompleted();
+ }, () -> {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No Local storage with id " + request.getStorageId())
+ .asRuntimeException());
+ });
+ } catch (Exception e) {
+ logger.error("Failed in retrieving storage with id {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void createLocalStorage(LocalStorageCreateRequest request, StreamObserver<LocalStorage> responseObserver) {
+ try {
+ responseObserver.onNext(this.backend.createLocalStorage(request));
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in creating the local storage", e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the local storage")
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void updateLocalStorage(LocalStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ this.backend.updateLocalStorage(request);
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in updating the local storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the local storge with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteLocalStorage(LocalStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ boolean res = this.backend.deleteLocalStorage(request);
+ if (res) {
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(new Exception("Failed to delete Local storage with id " + request.getStorageId()));
+ }
+ } catch (Exception e) {
+ logger.error("Failed in deleting the local storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the local storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/S3ServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/S3ServiceHandler.java
index 753bd6a..53e0dcb 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/S3ServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/S3ServiceHandler.java
@@ -23,6 +23,7 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.service.s3.S3ResourceServiceGrpc;
import org.apache.airavata.mft.resource.stubs.s3.resource.*;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,4 +102,70 @@
.asRuntimeException());
}
}
+
+ @Override
+ public void getS3Storage(S3StorageGetRequest request, StreamObserver<S3Storage> responseObserver) {
+ try {
+ this.backend.getS3Storage(request).ifPresentOrElse(resource -> {
+ responseObserver.onNext(resource);
+ responseObserver.onCompleted();
+ }, () -> {
+ responseObserver.onError(Status.INTERNAL
+ .withDescription("No S3 storage with id " + request.getStorageId())
+ .asRuntimeException());
+ });
+ } catch (Exception e) {
+ logger.error("Failed in retrieving S3 storage with id {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in retrieving S3 storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void createS3Storage(S3StorageCreateRequest request, StreamObserver<S3Storage> responseObserver) {
+ try {
+ responseObserver.onNext(this.backend.createS3Storage(request));
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in creating the S3 storage", e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in creating the S3 storage")
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void updateS3Storage(S3StorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ this.backend.updateS3Storage(request);
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ logger.error("Failed in updating the S3 storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in updating the S3 storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteS3Storage(S3StorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+ try {
+ boolean res = this.backend.deleteS3Storage(request);
+ if (res) {
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(new Exception("Failed to delete S3 storage with id " + request.getStorageId()));
+ }
+ } catch (Exception e) {
+ logger.error("Failed in deleting the S3 storage {}", request.getStorageId(), e);
+
+ responseObserver.onError(Status.INTERNAL.withCause(e)
+ .withDescription("Failed in deleting the S3 storage with id " + request.getStorageId())
+ .asRuntimeException());
+ }
+ }
}
diff --git a/services/resource-service/server/src/main/resources/application.properties b/services/resource-service/server/src/main/resources/application.properties
index 51872df..3dbfa86 100644
--- a/services/resource-service/server/src/main/resources/application.properties
+++ b/services/resource-service/server/src/main/resources/application.properties
@@ -24,4 +24,5 @@
airavata.backend.registry.server.port=8970
# Configurations for File Backend
-file.backend.resource.file=resources.json
\ No newline at end of file
+file.backend.resource.file=resources.json
+file.backend.storage.file=storages.json
\ No newline at end of file
diff --git a/services/resource-service/server/src/main/resources/resources.json b/services/resource-service/server/src/main/resources/resources.json
index a389282..3d7b62f 100644
--- a/services/resource-service/server/src/main/resources/resources.json
+++ b/services/resource-service/server/src/main/resources/resources.json
@@ -4,114 +4,76 @@
"resourceId": "remote-ssh-dir-resource",
"resourceMode": "DIRECTORY",
"resourcePath": "/tmp",
- "scpStorage" : {
- "storageId": "remote-ssh-storage",
- "host": "149.165.156.124",
- "port": 22,
- "user": "root"
- }
+ "storageId" : "remote-ssh-storage"
},
{
"type": "SCP",
"resourceId": "remote-ssh-resource",
"resourceMode": "FILE",
"resourcePath": "/tmp/1mb.txt",
- "scpStorage" : {
- "storageId": "remote-ssh-storage",
- "host": "149.165.156.124",
- "port": 22,
- "user": "root"
- }
+ "storageId" : "remote-ssh-storage"
},
{
"type": "SCP",
"resourceId": "remote-ssh-resource2",
"resourceMode": "FILE",
"resourcePath": "/tmp/10mb.txt",
- "scpStorage" : {
- "storageId": "remote-ssh-storage",
- "host": "149.165.156.124",
- "port": 22,
- "user": "root"
- }
+ "storageId" : "remote-ssh-storage"
},
{
"type": "LOCAL",
"resourceId": "10mb-file",
"resourceMode": "FILE",
"resourcePath": "/tmp/10mb.txt",
- "localStorage": {
- "storageId": "local-storage-1",
- "agentId": "agent-0"
- }
+ "storageId": "local-storage-1"
},
{
"type": "S3",
"resourceId": "s3-file",
"resourceMode": "FILE",
"resourcePath": "10mb-s3.txt",
- "s3Storage": {
- "storageId": "s3-storage-1",
- "region": "us-east-2",
- "bucketName": "airavata-s3"
- }
+ "storageId": "s3-storage-1"
},
{
"type": "BOX",
"resourceId": "box-file-abcd",
"resourceMode": "FILE",
"resourcePath": "655108198452",
- "boxStorage" : {
- "storageId": "box-storage-1"
- }
+ "storageId" : "box-storage-1"
},
{
"type": "BOX",
"resourceId": "box-file-efgh",
"resourceMode": "FILE",
"resourcePath": "655450661536",
- "boxStorage" : {
- "storageId": "box-storage-1"
- }
+ "storageId" : "box-storage-1"
},
{
"type": "AZURE",
"resourceId": "azure-blob",
"resourceMode": "FILE",
"resourcePath": "sample.blob",
- "azureStorage" : {
- "storageId": "azure-storage-1",
- "container": "sample-container"
- }
+ "storageId" : "azure-storage-1"
},
{
"type": "GCS",
"resourceId": "gcs-bucket",
"resourceMode": "FILE",
"resourcePath": "PikaPikaTest.txt",
- "gcsStorage": {
- "storageId": "gcs-storage-1",
- "bucketName": "pika-pika-bucket"
- }
+ "storageId": "gcs-storage-1"
},
{
"type": "DROPBOX",
"resourceId": "dropbox-file",
"resourceMode": "FILE",
"resourcePath": "/test.txt",
- "dropboxStorage": {
- "storageId": "dropbox-storage-1"
- }
+ "storageId": "dropbox-storage-1"
},
{
"type": "FTP",
"resourceId": "ftp-resource",
"resourceMode": "FILE",
"resourcePath": "mft-1mb.txt",
- "ftpStorage": {
- "storageId": "ftp-resource",
- "host": "ftp.dlptest.com",
- "port": "21"
- }
+ "storageId": "ftp-resource"
}
]
\ No newline at end of file
diff --git a/services/resource-service/server/src/main/resources/storages.json b/services/resource-service/server/src/main/resources/storages.json
new file mode 100644
index 0000000..69d0b13
--- /dev/null
+++ b/services/resource-service/server/src/main/resources/storages.json
@@ -0,0 +1,44 @@
+[
+ {
+ "type": "SCP",
+ "storageId": "remote-ssh-storage",
+ "host": "149.165.156.124",
+ "port": 22,
+ "user": "root"
+ },
+ {
+ "type": "LOCAL",
+ "storageId": "local-storage-1",
+ "agentId": "agent-0"
+ },
+ {
+ "type": "S3",
+ "storageId": "s3-storage-1",
+ "region": "us-east-2",
+ "bucketName": "airavata-s3"
+ },
+ {
+ "type": "BOX",
+ "storageId": "box-storage-1"
+ },
+ {
+ "type": "AZURE",
+ "storageId": "azure-storage-1",
+ "container": "sample-container"
+ },
+ {
+ "type": "GCS",
+ "storageId": "gcs-storage-1",
+ "bucketName": "pika-pika-bucket"
+ },
+ {
+ "type": "DROPBOX",
+ "storageId": "dropbox-storage-1"
+ },
+ {
+ "type": "FTP",
+ "storageId": "ftp-resource",
+ "host": "ftp.dlptest.com",
+ "port": "21"
+ }
+]
\ No newline at end of file
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
index 5a64d54..76a6f13 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
@@ -32,6 +32,9 @@
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResource;
import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorageGetRequest;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
@@ -95,7 +98,7 @@
}
@Override
- public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -104,7 +107,7 @@
throw new UnsupportedOperationException("Method not implemented"); }
@Override
- public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -115,6 +118,20 @@
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
AzureResource azureResource = resourceClient.azure().getAzureResource(AzureResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ return isAvailable(azureResource, credentialToken);
+ }
+
+ @Override
+ public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+ ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+ AzureStorage azureStorage = resourceClient.azure().getAzureStorage(AzureStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+ AzureResource azureResource = AzureResource.newBuilder().setFile(FileResource.newBuilder().setResourcePath(resourcePath).build()).setAzureStorage(azureStorage).build();
+ return isAvailable(azureResource, credentialToken);
+ }
+
+ public Boolean isAvailable(AzureResource azureResource, String credentialToken) throws Exception {
+
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
AzureSecret azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -132,4 +149,5 @@
}
return false;
}
+
}
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
index a9bfdf7..dab9337 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
@@ -29,8 +29,8 @@
import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResource;
-import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
@@ -43,21 +43,20 @@
private static final Logger logger = LoggerFactory.getLogger(AzureReceiver.class);
private boolean initialized = false;
- private AzureResource azureResource;
BlobContainerClient containerClient;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
this.initialized = true;
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.azureResource = resourceClient.azure().getAzureResource(AzureResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ AzureStorage azureStorage = resourceClient.azure().getAzureStorage(AzureStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
AzureSecret azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(azureSecret.getConnectionString()).buildClient();
- this.containerClient = blobServiceClient.getBlobContainerClient(azureResource.getAzureStorage().getContainer());
+ this.containerClient = blobServiceClient.getBlobContainerClient(azureStorage.getContainer());
}
@Override
@@ -72,49 +71,41 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting azure receive for remote server for transfer {}", context.getTransferId());
checkInitialized();
- if (ResourceTypes.FILE.equals(this.azureResource.getResourceCase().name())) {
- BlobClient blobClient = containerClient.getBlobClient(azureResource.getFile().getResourcePath());
- BlobInputStream blobInputStream = blobClient.openInputStream();
+ BlobClient blobClient = containerClient.getBlobClient(targetPath);
+ BlobInputStream blobInputStream = blobClient.openInputStream();
- OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+ OutputStream streamOs = context.getStreamBuffer().getOutputStream();
- long fileSize = context.getMetadata().getResourceSize();
+ long fileSize = context.getMetadata().getResourceSize();
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = blobInputStream.read(buf, 0, bufSize);
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = blobInputStream.read(buf, 0, bufSize);
- if (bufSize < 0) {
- break;
- }
-
- streamOs.write(buf, 0, bufSize);
- streamOs.flush();
-
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ if (bufSize < 0) {
+ break;
}
- streamOs.close();
- logger.info("Completed azure receive for remote server for transfer {}", context.getTransferId());
+ streamOs.write(buf, 0, bufSize);
+ streamOs.flush();
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.azureResource.getResourceId(), this.azureResource.getResourceCase().name());
- throw new Exception("Resource " + this.azureResource.getResourceId() + " should be a FILE type. Found a " +
- this.azureResource.getResourceCase().name());
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
+
+ streamOs.close();
+ logger.info("Completed azure receive for remote server for transfer {}", context.getTransferId());
}
}
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
index 7dcff4b..ef3249e 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
@@ -30,6 +30,8 @@
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResource;
import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
@@ -40,21 +42,20 @@
private static final Logger logger = LoggerFactory.getLogger(AzureSender.class);
private boolean initialized = false;
- private AzureResource azureResource;
BlobContainerClient containerClient;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
this.initialized = true;
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.azureResource = resourceClient.azure().getAzureResource(AzureResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ AzureStorage azureStorage = resourceClient.azure().getAzureStorage(AzureStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
AzureSecret azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(azureSecret.getConnectionString()).buildClient();
- this.containerClient = blobServiceClient.getBlobContainerClient(azureResource.getAzureStorage().getContainer());
+ this.containerClient = blobServiceClient.getBlobContainerClient(azureStorage.getContainer());
}
@Override
@@ -69,20 +70,13 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting Azure send for remote server for transfer {}", context.getTransferId());
checkInitialized();
- if (ResourceTypes.FILE.equals(this.azureResource.getResourceCase().name())) {
- BlockBlobClient blockBlobClient = containerClient.getBlobClient(azureResource.getFile().getResourcePath()).getBlockBlobClient();
- blockBlobClient.upload(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize(), true);
- logger.info("Completed Azure send for remote server for transfer {}", context.getTransferId());
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.azureResource.getResourceId(), this.azureResource.getResourceCase().name());
- throw new Exception("Resource " + this.azureResource.getResourceId() + " should be a FILE type. Found a " +
- this.azureResource.getResourceCase().name());
- }
+ BlockBlobClient blockBlobClient = containerClient.getBlobClient(targetPath).getBlockBlobClient();
+ blockBlobClient.upload(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize(), true);
+ logger.info("Completed Azure send for remote server for transfer {}", context.getTransferId());
}
}
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
index 8e21c9f..5e77e72 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
@@ -30,6 +30,9 @@
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.box.resource.BoxResource;
import org.apache.airavata.mft.resource.stubs.box.resource.BoxResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorage;
+import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorageGetRequest;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
@@ -84,7 +87,7 @@
}
@Override
- public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -93,7 +96,7 @@
throw new UnsupportedOperationException("Method not implemented"); }
@Override
- public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -104,7 +107,22 @@
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
BoxResource boxResource = resourceClient.box().getBoxResource(BoxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ return isAvailable(boxResource, credentialToken);
+ }
+ @Override
+ public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+ checkInitialized();
+
+ ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+ BoxStorage boxStorage = resourceClient.box().getBoxStorage(BoxStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+ BoxResource boxResource = BoxResource.newBuilder().setFile(FileResource.newBuilder().setResourcePath(resourcePath).build()).setBoxStorage(boxStorage).build();
+
+ return isAvailable(boxResource, credentialToken);
+ }
+
+ private Boolean isAvailable(BoxResource boxResource, String credentialToken) throws Exception {
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
index 907b485..e041e35 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
@@ -29,6 +29,8 @@
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.box.resource.BoxResource;
import org.apache.airavata.mft.resource.stubs.box.resource.BoxResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorage;
+import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
@@ -40,20 +42,14 @@
public class BoxReceiver implements Connector {
private static final Logger logger = LoggerFactory.getLogger(BoxReceiver.class);
- private BoxSecret boxSecret;
- private BoxResource boxResource;
private BoxAPIConnection boxClient;
-
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
String secretServiceHost, int secretServicePort) throws Exception {
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- boxResource = resourceClient.box().getBoxResource(BoxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
-
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+ BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
boxClient = new BoxAPIConnection(boxSecret.getAccessToken());
}
@@ -64,25 +60,17 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
- if (ResourceTypes.FILE.equals(this.boxResource.getResourceCase().name())) {
- logger.info("Starting Box Receiver stream for transfer {}", context.getTransferId());
+ logger.info("Starting Box Receiver stream for transfer {}", context.getTransferId());
- BoxFile file = new BoxFile(this.boxClient, this.boxResource.getFile().getResourcePath());
+ BoxFile file = new BoxFile(this.boxClient, targetPath);
- OutputStream os = context.getStreamBuffer().getOutputStream();
- file.download(os);
- os.flush();
- os.close();
+ OutputStream os = context.getStreamBuffer().getOutputStream();
+ file.download(os);
+ os.flush();
+ os.close();
- logger.info("Completed Box Receiver stream for transfer {}", context.getTransferId());
-
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.boxResource.getResourceId(), this.boxResource.getResourceCase().name());
- throw new Exception("Resource " + this.boxResource.getResourceId() + " should be a FILE type. Found a " +
- this.boxResource.getResourceCase().name());
- }
+ logger.info("Completed Box Receiver stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
index c9b279f..6cd50c5 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
@@ -37,19 +37,13 @@
public class BoxSender implements Connector {
private static final Logger logger = LoggerFactory.getLogger(BoxSender.class);
- private BoxSecret boxSecret;
- private BoxResource boxResource;
private BoxAPIConnection boxClient;
-
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
-
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- boxResource = resourceClient.box().getBoxResource(BoxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+ BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
boxClient = new BoxAPIConnection(boxSecret.getAccessToken());
}
@@ -60,29 +54,22 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting Box Sender stream for transfer {}", context.getTransferId());
logger.debug("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
- if (ResourceTypes.FILE.equals(this.boxResource.getResourceCase().name())) {
- BoxFile file = new BoxFile(this.boxClient, this.boxResource.getFile().getResourcePath());
+ BoxFile file = new BoxFile(this.boxClient, targetPath);
- // Upload chunks only if the file size is > 20mb
- // Ref: https://developer.box.com/guides/uploads/chunked/
- if (context.getMetadata().getResourceSize() > 20971520) {
- file.uploadLargeFile(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize());
- } else {
- file.uploadNewVersion(context.getStreamBuffer().getInputStream());
- }
-
- logger.info("Completed Box Sender stream for transfer {}", context.getTransferId());
-
+ // Upload chunks only if the file size is > 20mb
+ // Ref: https://developer.box.com/guides/uploads/chunked/
+ if (context.getMetadata().getResourceSize() > 20971520) {
+ file.uploadLargeFile(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize());
} else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.boxResource.getResourceId(), this.boxResource.getResourceCase().name());
- throw new Exception("Resource " + this.boxResource.getResourceId() + " should be a FILE type. Found a " +
- this.boxResource.getResourceCase().name());
+ file.uploadNewVersion(context.getStreamBuffer().getInputStream());
}
+
+ logger.info("Completed Box Sender stream for transfer {}", context.getTransferId());
+
}
}
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
index 906fb0b..2d00e10 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
@@ -28,8 +28,11 @@
import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.dropbox.resource.DropboxResource;
import org.apache.airavata.mft.resource.stubs.dropbox.resource.DropboxResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.DropboxStorage;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.DropboxStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
@@ -78,7 +81,7 @@
}
@Override
- public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -87,7 +90,7 @@
throw new UnsupportedOperationException("Method not implemented"); }
@Override
- public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -97,6 +100,23 @@
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
DropboxResource dropboxResource = resourceClient.dropbox().getDropboxResource(DropboxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ return isAvailable(dropboxResource, credentialToken);
+ }
+
+ @Override
+ public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+
+ ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+ DropboxStorage dropboxStorage = resourceClient.dropbox().getDropboxStorage(DropboxStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+ DropboxResource dropboxResource = DropboxResource.newBuilder()
+ .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+ .setDropboxStorage(dropboxStorage).build();
+
+ return isAvailable(dropboxResource, credentialToken);
+ }
+
+ private Boolean isAvailable(DropboxResource dropboxResource, String credentialToken) throws Exception {
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
DropboxSecret dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
index 5bdf5a9..d694f91 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
@@ -40,14 +40,10 @@
private static final Logger logger = LoggerFactory.getLogger(DropboxReceiver.class);
- private DropboxResource dropboxResource;
private DbxClientV2 dbxClientV2;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.dropboxResource = resourceClient.dropbox().getDropboxResource(DropboxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
-
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
DropboxSecret dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -61,48 +57,41 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
- if (ResourceTypes.FILE.equals(this.dropboxResource.getResourceCase().name())) {
- logger.info("Starting Dropbox Receiver stream for transfer {}", context.getTransferId());
+ logger.info("Starting Dropbox Receiver stream for transfer {}", context.getTransferId());
- InputStream inputStream = dbxClientV2.files().download(this.dropboxResource.getFile().getResourcePath()).getInputStream();
- OutputStream os = context.getStreamBuffer().getOutputStream();
- int read;
- long bytes = 0;
- long fileSize = context.getMetadata().getResourceSize();
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ InputStream inputStream = dbxClientV2.files().download(targetPath).getInputStream();
+ OutputStream os = context.getStreamBuffer().getOutputStream();
+ int read;
+ long bytes = 0;
+ long fileSize = context.getMetadata().getResourceSize();
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = inputStream.read(buf, 0, bufSize);
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = inputStream.read(buf, 0, bufSize);
- if (bufSize < 0) {
- break;
- }
-
- os.write(buf, 0, bufSize);
- os.flush();
-
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ if (bufSize < 0) {
+ break;
}
- os.close();
+ os.write(buf, 0, bufSize);
+ os.flush();
- logger.info("Completed Dropbox Receiver stream for transfer {}", context.getTransferId());
-
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.dropboxResource.getResourceId(), this.dropboxResource.getResourceCase().name());
- throw new Exception("Resource " + this.dropboxResource.getResourceId() + " should be a FILE type. Found a " +
- this.dropboxResource.getResourceCase().name());
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
+
+ os.close();
+
+ logger.info("Completed Dropbox Receiver stream for transfer {}", context.getTransferId());
+
}
}
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
index ffab965..da8f8b7 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
@@ -40,13 +40,10 @@
private static final Logger logger = LoggerFactory.getLogger(DropboxSender.class);
- private DropboxResource dropboxResource;
private DbxClientV2 dbxClientV2;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.dropboxResource = resourceClient.dropbox().getDropboxResource(DropboxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
DropboxSecret dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -61,24 +58,15 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting Dropbox Sender stream for transfer {}", context.getTransferId());
logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
+ FileMetadata metadata = dbxClientV2.files().uploadBuilder(targetPath)
+ .withMode(WriteMode.OVERWRITE)
+ .uploadAndFinish(context.getStreamBuffer().getInputStream());
+ logger.info("Completed Dropbox Sender stream for transfer {}", context.getTransferId());
- if (ResourceTypes.FILE.equals(this.dropboxResource.getResourceCase().name())) {
- FileMetadata metadata = dbxClientV2.files().uploadBuilder(dropboxResource.getFile().getResourcePath())
- .withMode(WriteMode.OVERWRITE)
- .uploadAndFinish(context.getStreamBuffer().getInputStream());
- logger.info("Completed Dropbox Sender stream for transfer {}", context.getTransferId());
-
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.dropboxResource.getResourceId(), this.dropboxResource.getResourceCase().name());
- throw new Exception("Resource " + this.dropboxResource.getResourceId() + " should be a FILE type. Found a " +
- this.dropboxResource.getResourceCase().name());
- }
-
}
}
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
index 66675f3..5f6f7af 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
@@ -25,8 +25,11 @@
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResource;
import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.apache.commons.net.ftp.FTPClient;
@@ -74,7 +77,7 @@
FileResourceMetadata resourceMetadata = new FileResourceMetadata();
FTPClient ftpClient = null;
try {
- ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
+ ftpClient = FTPTransportUtil.getFTPClient(ftpResource.getFtpStorage(), ftpSecret);
logger.info("Fetching metadata for resource {} in {}", ftpResource.getFile().getResourcePath(), ftpResource.getFtpStorage().getHost());
FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getFile().getResourcePath());
@@ -99,7 +102,7 @@
}
@Override
- public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -108,7 +111,7 @@
throw new UnsupportedOperationException("Method not implemented"); }
@Override
- public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -119,12 +122,32 @@
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
FTPResource ftpResource = resourceClient.ftp().getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+ return isAvailable(ftpResource, credentialToken);
+ }
+
+ @Override
+ public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+
+ checkInitialized();
+
+ ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+ FTPStorage ftpStorage = resourceClient.ftp().getFTPStorage(FTPStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+ FTPResource ftpResource = FTPResource.newBuilder()
+ .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+ .setFtpStorage(ftpStorage).build();
+ return isAvailable(ftpResource, credentialToken);
+ }
+
+ public Boolean isAvailable(FTPResource ftpResource, String credentialToken) {
+
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
FTPSecret ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
FTPClient ftpClient = null;
try {
- ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
+ ftpClient = FTPTransportUtil.getFTPClient(ftpResource.getFtpStorage(), ftpSecret);
InputStream inputStream = null;
switch (ftpResource.getResourceCase().name()){
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
index 9225b0e..f3b4ac8 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
@@ -26,6 +26,8 @@
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResource;
import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.apache.commons.net.ftp.FTPClient;
@@ -39,21 +41,20 @@
private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class);
- private FTPResource resource;
private boolean initialized;
private FTPClient ftpClient;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
this.initialized = true;
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.resource = resourceClient.ftp().getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ FTPStorage ftpStorage = resourceClient.ftp().getFTPStorage(FTPStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
FTPSecret ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret);
+ this.ftpClient = FTPTransportUtil.getFTPClient(ftpStorage, ftpSecret);
}
@Override
@@ -62,50 +63,43 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
- if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
- logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
+ logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
- checkInitialized();
- OutputStream streamOs = context.getStreamBuffer().getOutputStream();
- InputStream inputStream = ftpClient.retrieveFileStream(resource.getFile().getResourcePath());
+ checkInitialized();
+ OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+ InputStream inputStream = ftpClient.retrieveFileStream(targetPath);
- long fileSize = context.getMetadata().getResourceSize();
+ long fileSize = context.getMetadata().getResourceSize();
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize;
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize;
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = inputStream.read(buf, 0, bufSize);
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = inputStream.read(buf, 0, bufSize);
- if (bufSize < 0) {
- break;
- }
-
- streamOs.write(buf, 0, bufSize);
- streamOs.flush();
-
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ if (bufSize < 0) {
+ break;
}
- inputStream.close();
- streamOs.close();
- logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
+ streamOs.write(buf, 0, bufSize);
+ streamOs.flush();
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.resource.getResourceId(), this.resource.getResourceCase().name());
- throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
- this.resource.getResourceCase().name());
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
+
+ inputStream.close();
+ streamOs.close();
+ logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
+
}
private void checkInitialized() {
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
index f3a55f7..5579cb8 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
@@ -18,14 +18,13 @@
package org.apache.airavata.mft.transport.ftp;
import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResource;
-import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.apache.commons.net.ftp.FTPClient;
@@ -39,21 +38,20 @@
private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class);
- private FTPResource resource;
private boolean initialized;
private FTPClient ftpClient;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
this.initialized = true;
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.resource = resourceClient.ftp().getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ FTPStorage ftpStorage = resourceClient.ftp().getFTPStorage(FTPStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
FTPSecret ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret);
+ this.ftpClient = FTPTransportUtil.getFTPClient(ftpStorage, ftpSecret);
}
@Override
@@ -62,7 +60,7 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting FTP sender stream for transfer {}", context.getTransferId());
@@ -70,42 +68,35 @@
logger.info("Completed FTP sender stream for transfer {}", context.getTransferId());
- if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
- InputStream in = context.getStreamBuffer().getInputStream();
- long fileSize = context.getMetadata().getResourceSize();
- OutputStream outputStream = ftpClient.storeFileStream(resource.getFile().getResourcePath());
+ InputStream in = context.getStreamBuffer().getInputStream();
+ long fileSize = context.getMetadata().getResourceSize();
+ OutputStream outputStream = ftpClient.storeFileStream(targetPath);
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize;
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize;
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = in.read(buf, 0, bufSize);
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = in.read(buf, 0, bufSize);
- if (bufSize < 0) {
- break;
- }
-
- outputStream.write(buf, 0, bufSize);
- outputStream.flush();
-
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ if (bufSize < 0) {
+ break;
}
- in.close();
- outputStream.close();
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.resource.getResourceId(), this.resource.getResourceCase().name());
- throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
- this.resource.getResourceCase().name());
+ outputStream.write(buf, 0, bufSize);
+ outputStream.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
+
+ in.close();
+ outputStream.close();
}
private void checkInitialized() {
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
index 7a1e3c8..988a208 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
@@ -18,7 +18,7 @@
package org.apache.airavata.mft.transport.ftp;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
-import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResource;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
import org.apache.commons.net.ftp.FTPClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,10 +29,10 @@
private static final Logger logger = LoggerFactory.getLogger(FTPTransportUtil.class);
- static FTPClient getFTPClient(FTPResource ftpResource, FTPSecret ftpSecret) throws IOException {
+ static FTPClient getFTPClient(FTPStorage ftpStorage, FTPSecret ftpSecret) throws IOException {
FTPClient ftpClient = new FTPClient();
- ftpClient.connect(ftpResource.getFtpStorage().getHost(), ftpResource.getFtpStorage().getPort());
+ ftpClient.connect(ftpStorage.getHost(), ftpStorage.getPort());
ftpClient.enterLocalActiveMode();
ftpClient.login(ftpSecret.getUserId(), ftpSecret.getPassword());
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
index aa97091..0d1a1a8 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
@@ -33,8 +33,11 @@
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResource;
import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
@@ -99,7 +102,7 @@
}
@Override
- public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -108,7 +111,7 @@
throw new UnsupportedOperationException("Method not implemented"); }
@Override
- public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -118,6 +121,24 @@
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
GCSResource gcsResource = resourceClient.gcs().getGCSResource(GCSResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ return isAvailable(gcsResource, credentialToken);
+ }
+
+ @Override
+ public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+ checkInitialized();
+ ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+ GCSStorage gcsStorage = resourceClient.gcs().getGCSStorage(GCSStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+ GCSResource gcsResource = GCSResource.newBuilder()
+ .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+ .setGcsStorage(gcsStorage).build();
+
+ return isAvailable(gcsResource, credentialToken);
+ }
+
+ private Boolean isAvailable(GCSResource gcsResource, String credentialToken) throws Exception {
+
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
GCSSecret gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -134,10 +155,10 @@
switch (gcsResource.getResourceCase().name()){
case ResourceTypes.FILE:
return !storage.objects().get(gcsResource.getGcsStorage().getBucketName(), gcsResource.getFile().getResourcePath())
- .execute().isEmpty();
+ .execute().isEmpty();
case ResourceTypes.DIRECTORY:
return !storage.objects().get(gcsResource.getGcsStorage().getBucketName(), gcsResource.getDirectory().getResourcePath())
- .execute().isEmpty();
+ .execute().isEmpty();
}
return false;
}
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
index 03fd31c..d734f92 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
@@ -33,6 +33,8 @@
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResource;
import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
@@ -49,16 +51,18 @@
private static final Logger logger = LoggerFactory.getLogger(GCSReceiver.class);
- private GCSResource gcsResource;
private Storage storage;
+ private GCSStorage gcsStorage;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.gcsResource = resourceClient.gcs().getGCSResource(GCSResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ this.gcsStorage = resourceClient.gcs().getGCSStorage(GCSStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
GCSSecret gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = new JacksonFactory();
String jsonString = gcsSecret.getCredentialsJson();
@@ -76,48 +80,40 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting GCS Receiver stream for transfer {}", context.getTransferId());
- if (ResourceTypes.FILE.equals(this.gcsResource.getResourceCase().name())) {
- InputStream inputStream = storage.objects().get(this.gcsResource.getGcsStorage().getBucketName(),
- this.gcsResource.getFile().getResourcePath()).executeMediaAsInputStream();
- OutputStream os = context.getStreamBuffer().getOutputStream();
- int read;
- long bytes = 0;
- long fileSize = context.getMetadata().getResourceSize();
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ InputStream inputStream = storage.objects().get(this.gcsStorage.getBucketName(), targetPath).executeMediaAsInputStream();
+ OutputStream os = context.getStreamBuffer().getOutputStream();
+ int read;
+ long bytes = 0;
+ long fileSize = context.getMetadata().getResourceSize();
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = inputStream.read(buf, 0, bufSize);
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = inputStream.read(buf, 0, bufSize);
- if (bufSize < 0) {
- break;
- }
-
- os.write(buf, 0, bufSize);
- os.flush();
-
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ if (bufSize < 0) {
+ break;
}
- os.close();
+ os.write(buf, 0, bufSize);
+ os.flush();
- logger.info("Completed GCS Receiver stream for transfer {}", context.getTransferId());
-
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.gcsResource.getResourceId(), this.gcsResource.getResourceCase().name());
- throw new Exception("Resource " + this.gcsResource.getResourceId() + " should be a FILE type. Found a " +
- this.gcsResource.getResourceCase().name());
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
+
+ os.close();
+
+ logger.info("Completed GCS Receiver stream for transfer {}", context.getTransferId());
+
}
}
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
index 368d24d..4d0ef29 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
@@ -31,14 +31,13 @@
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResource;
-import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
@@ -54,15 +53,15 @@
private static final Logger logger = LoggerFactory.getLogger(GCSSender.class);
- private GCSResource gcsResource;
private Storage storage;
+ private GCSStorage gcsStorage;
private JsonObject jsonObject;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.gcsResource = resourceClient.gcs().getGCSResource(GCSResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ this.gcsStorage = resourceClient.gcs().getGCSStorage(GCSStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
GCSSecret gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -86,33 +85,23 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting GCS Sender stream for transfer {}", context.getTransferId());
logger.debug("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
- if (ResourceTypes.FILE.equals(this.gcsResource.getResourceCase().name())) {
+ InputStreamContent contentStream = new InputStreamContent(
+ null, context.getStreamBuffer().getInputStream());
+ String entityUser = jsonObject.get("client_email").getAsString();
+ StorageObject objectMetadata = new StorageObject()
+ // Set the destination object name
+ .setName(targetPath)
+ // Set the access control list to publicly read-only
+ .setAcl(Arrays.asList(new ObjectAccessControl().setEntity("user-" + entityUser).setRole("OWNER")));
- InputStreamContent contentStream = new InputStreamContent(
- null, context.getStreamBuffer().getInputStream());
- String entityUser = jsonObject.get("client_email").getAsString();
- StorageObject objectMetadata = new StorageObject()
- // Set the destination object name
- .setName(this.gcsResource.getFile().getResourcePath())
- // Set the access control list to publicly read-only
- .setAcl(Arrays.asList(new ObjectAccessControl().setEntity("user-" + entityUser).setRole("OWNER")));
+ Insert insertRequest = storage.objects().insert(this.gcsStorage.getBucketName(), objectMetadata, contentStream);
- Insert insertRequest = storage.objects().insert(this.gcsResource.getGcsStorage().getBucketName(), objectMetadata, contentStream);
+ insertRequest.execute();
- insertRequest.execute();
-
- logger.info("Completed GCS Sender stream for transfer {}", context.getTransferId());
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.gcsResource.getResourceId(), this.gcsResource.getResourceCase().name());
- throw new Exception("Resource " + this.gcsResource.getResourceId() + " should be a FILE type. Found a " +
- this.gcsResource.getResourceCase().name());
- }
-
-
+ logger.info("Completed GCS Sender stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
index f24c70f..9faa70a 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
@@ -23,8 +23,11 @@
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.local.resource.LocalResource;
import org.apache.airavata.mft.resource.stubs.local.resource.LocalResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorage;
+import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorageGetRequest;
import java.io.File;
import java.io.FileInputStream;
@@ -92,7 +95,7 @@
}
@Override
- public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -101,15 +104,29 @@
throw new UnsupportedOperationException("Method not implemented"); }
@Override
- public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@Override
public Boolean isAvailable(String resourceId, String credentialToken) throws Exception {
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+ ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
LocalResource localResource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ return isAvailable(localResource, credentialToken);
+ }
+ @Override
+ public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+ ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+ LocalStorage localStorage = resourceClient.local().getLocalStorage(LocalStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+ LocalResource localResource = LocalResource.newBuilder()
+ .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+ .setLocalStorage(localStorage).build();
+ return isAvailable(localResource, credentialToken);
+ }
+
+ public Boolean isAvailable(LocalResource localResource, String credentialToken) throws Exception {
switch (localResource.getResourceCase().name()){
case ResourceTypes.FILE:
return new File(localResource.getFile().getResourcePath()).exists();
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
index 1b61fb1..53776ff 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
@@ -18,12 +18,7 @@
package org.apache.airavata.mft.transport.local;
import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.local.resource.LocalResource;
-import org.apache.airavata.mft.resource.stubs.local.resource.LocalResourceGetRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,16 +28,12 @@
private static final Logger logger = LoggerFactory.getLogger(LocalReceiver.class);
- private LocalResource resource;
private boolean initialized;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
String secretServiceHost, int secretServicePort) throws Exception {
this.initialized = true;
-
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.resource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
}
@Override
@@ -57,50 +48,41 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting local receiver stream for transfer {}", context.getTransferId());
checkInitialized();
+ OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+ FileInputStream fis = new FileInputStream(new File(targetPath));
- if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
- OutputStream streamOs = context.getStreamBuffer().getOutputStream();
- FileInputStream fis = new FileInputStream(new File(resource.getFile().getResourcePath()));
+ long fileSize = context.getMetadata().getResourceSize();
- long fileSize = context.getMetadata().getResourceSize();
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = fis.read(buf, 0, bufSize);
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = fis.read(buf, 0, bufSize);
-
- if (bufSize < 0) {
- break;
- }
-
- streamOs.write(buf, 0, bufSize);
- streamOs.flush();
-
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ if (bufSize < 0) {
+ break;
}
- fis.close();
- streamOs.close();
- logger.info("Completed local receiver stream for transfer {}", context.getTransferId());
+ streamOs.write(buf, 0, bufSize);
+ streamOs.flush();
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.resource.getResourceId(), this.resource.getResourceCase().name());
- throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
- this.resource.getResourceCase().name());
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
+
+ fis.close();
+ streamOs.close();
+ logger.info("Completed local receiver stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
index 315fa25..e4d5e48 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
@@ -18,12 +18,7 @@
package org.apache.airavata.mft.transport.local;
import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.local.resource.LocalResource;
-import org.apache.airavata.mft.resource.stubs.local.resource.LocalResourceGetRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,16 +28,12 @@
private static final Logger logger = LoggerFactory.getLogger(LocalSender.class);
- private LocalResource resource;
private boolean initialized;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
String secretServiceHost, int secretServicePort) throws Exception {
this.initialized = true;
-
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.resource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
}
@Override
@@ -57,49 +48,42 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting local sender stream for transfer {}", context.getTransferId());
checkInitialized();
- if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
- InputStream in = context.getStreamBuffer().getInputStream();
- long fileSize = context.getMetadata().getResourceSize();
- OutputStream fos = new FileOutputStream(resource.getFile().getResourcePath());
+ InputStream in = context.getStreamBuffer().getInputStream();
+ long fileSize = context.getMetadata().getResourceSize();
+ OutputStream fos = new FileOutputStream(targetPath);
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = in.read(buf, 0, bufSize);
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = in.read(buf, 0, bufSize);
- if (bufSize < 0) {
- break;
- }
-
- fos.write(buf, 0, bufSize);
- fos.flush();
-
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ if (bufSize < 0) {
+ break;
}
- in.close();
- fos.close();
+ fos.write(buf, 0, bufSize);
+ fos.flush();
- logger.info("Completed local sender stream for transfer {}", context.getTransferId());
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.resource.getResourceId(), this.resource.getResourceCase().name());
- throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
- this.resource.getResourceCase().name());
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
+
+ in.close();
+ fos.close();
+
+ logger.info("Completed local sender stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 572affd..58be5b6 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -30,8 +30,11 @@
import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.s3.resource.S3Resource;
import org.apache.airavata.mft.resource.stubs.s3.resource.S3ResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
@@ -85,7 +88,7 @@
}
@Override
- public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -94,7 +97,7 @@
throw new UnsupportedOperationException("Method not implemented"); }
@Override
- public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
throw new UnsupportedOperationException("Method not implemented");
}
@@ -105,6 +108,23 @@
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
S3Resource s3Resource = resourceClient.s3().getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ return isAvailable(s3Resource, credentialToken);
+ }
+
+ @Override
+ public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+ checkInitialized();
+ ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+ S3Storage s3Storage = resourceClient.s3().getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(storageId).build());
+ S3Resource s3Resource = S3Resource.newBuilder()
+ .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+ .setS3Storage(s3Storage).build();
+
+ return isAvailable(s3Resource, credentialToken);
+ }
+
+ private Boolean isAvailable(S3Resource s3Resource, String credentialToken) throws Exception {
+
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
index ddfd425..2e7e816 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
@@ -32,6 +32,8 @@
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.s3.resource.S3Resource;
import org.apache.airavata.mft.resource.stubs.s3.resource.S3ResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
@@ -43,15 +45,15 @@
private static final Logger logger = LoggerFactory.getLogger(S3Receiver.class);
- private S3Resource s3Resource;
private AmazonS3 s3Client;
+ private S3Storage s3Storage;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
- String secretServiceHost, int secretServicePort) throws Exception {
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+ String secretServiceHost, int secretServicePort) throws Exception {
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.s3Resource = resourceClient.s3().getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ this.s3Storage = resourceClient.s3().getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -59,7 +61,7 @@
s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .withRegion(s3Resource.getS3Storage().getRegion())
+ .withRegion(s3Storage.getRegion())
.build();
}
@@ -69,31 +71,23 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
- if (ResourceTypes.FILE.equals(this.s3Resource.getResourceCase().name())) {
- logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
+ logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
- S3Object s3object = s3Client.getObject(s3Resource.getS3Storage().getBucketName(), s3Resource.getFile().getResourcePath());
- S3ObjectInputStream inputStream = s3object.getObjectContent();
+ S3Object s3object = s3Client.getObject(this.s3Storage.getBucketName(), targetPath);
+ S3ObjectInputStream inputStream = s3object.getObjectContent();
- OutputStream os = context.getStreamBuffer().getOutputStream();
- int read;
- long bytes = 0;
- while ((read = inputStream.read()) != -1) {
- bytes++;
- os.write(read);
- }
- os.flush();
- os.close();
-
- logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
-
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.s3Resource.getResourceId(), this.s3Resource.getResourceCase().name());
- throw new Exception("Resource " + this.s3Resource.getResourceId() + " should be a FILE type. Found a " +
- this.s3Resource.getResourceCase().name());
+ OutputStream os = context.getStreamBuffer().getOutputStream();
+ int read;
+ long bytes = 0;
+ while ((read = inputStream.read()) != -1) {
+ bytes++;
+ os.write(read);
}
+ os.flush();
+ os.close();
+
+ logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
index 095ed3c..a7a640c 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
@@ -31,6 +31,8 @@
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.s3.resource.S3Resource;
import org.apache.airavata.mft.resource.stubs.s3.resource.S3ResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
@@ -41,13 +43,13 @@
private static final Logger logger = LoggerFactory.getLogger(S3Sender.class);
private AmazonS3 s3Client;
- private S3Resource s3Resource;
+ private S3Storage s3Storage;
@Override
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.s3Resource = resourceClient.s3().getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ this.s3Storage = resourceClient.s3().getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -55,7 +57,7 @@
s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .withRegion(s3Resource.getS3Storage().getRegion())
+ .withRegion(s3Storage.getRegion())
.build();
}
@@ -65,23 +67,14 @@
}
@Override
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
logger.info("Starting S3 Sender stream for transfer {}", context.getTransferId());
logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(context.getMetadata().getResourceSize());
- if (ResourceTypes.FILE.equals(this.s3Resource.getResourceCase().name())) {
- s3Client.putObject(this.s3Resource.getS3Storage().getBucketName(), this.s3Resource.getFile().getResourcePath(),
- context.getStreamBuffer().getInputStream(), metadata);
- logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.s3Resource.getResourceId(), this.s3Resource.getResourceCase().name());
- throw new Exception("Resource " + this.s3Resource.getResourceId() + " should be a FILE type. Found a " +
- this.s3Resource.getResourceCase().name());
- }
-
+ s3Client.putObject(this.s3Storage.getBucketName(), targetPath, context.getStreamBuffer().getInputStream(), metadata);
+ logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
index d0086a5..ac09acb 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
@@ -40,6 +40,9 @@
import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResource;
import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageCreateRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.apache.commons.io.IOUtils;
@@ -76,7 +79,7 @@
}
}
- private FileResourceMetadata getFileResourceMetadata(SCPResource scpResource, SCPSecret scpSecret, String parentResourceId) throws Exception {
+ private FileResourceMetadata getFileResourceMetadata(SCPResource scpResource, SCPSecret scpSecret) throws Exception {
try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
logger.info("Fetching metadata for resource {} in {}", scpResource.getFile().getResourcePath(), scpResource.getScpStorage().getHost());
@@ -89,8 +92,6 @@
metadata.setResourceSize(lstat.getSize());
metadata.setCreatedTime(lstat.getAtime());
metadata.setUpdateTime(lstat.getMtime());
- metadata.setParentResourceId(parentResourceId);
- metadata.setParentResourceType("SCP");
metadata.setFriendlyName(new File(scpResource.getFile().getResourcePath()).getName());
metadata.setResourcePath(scpResource.getFile().getResourcePath());
@@ -127,28 +128,26 @@
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- return getFileResourceMetadata(scpResource, scpSecret, resourceId);
+ return getFileResourceMetadata(scpResource, scpSecret);
}
@Override
- public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- SCPResource parentSCPResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(parentResourceId).build());
+ SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- validateParent(parentSCPResource, resourcePath);
-
SCPResource scpResource = SCPResource.newBuilder()
.setFile(FileResource.newBuilder()
.setResourcePath(resourcePath).build())
- .setScpStorage(parentSCPResource.getScpStorage()).build();
+ .setScpStorage(scpStorage).build();
- return getFileResourceMetadata(scpResource, scpSecret, parentResourceId);
+ return getFileResourceMetadata(scpResource, scpSecret);
}
- private DirectoryResourceMetadata getDirectoryResourceMetadata(SCPResource scpResource, SCPSecret scpSecret, String parentResourceId) throws Exception {
+ private DirectoryResourceMetadata getDirectoryResourceMetadata(SCPResource scpResource, SCPSecret scpSecret) throws Exception {
try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
logger.info("Fetching metadata for resource {} in {}", scpResource.getFile().getResourcePath(), scpResource.getScpStorage().getHost());
@@ -167,9 +166,7 @@
.withFriendlyName(rri.getName())
.withResourcePath(rri.getPath())
.withCreatedTime(rri.getAttributes().getAtime())
- .withUpdateTime(rri.getAttributes().getMtime())
- .withParentResourceId(parentResourceId)
- .withParentResourceType("SCP");
+ .withUpdateTime(rri.getAttributes().getMtime());
dirMetadataBuilder = dirMetadataBuilder.withDirectory(childDirBuilder.build());
}
@@ -178,20 +175,16 @@
.withFriendlyName(rri.getName())
.withResourcePath(rri.getPath())
.withCreatedTime(rri.getAttributes().getAtime())
- .withUpdateTime(rri.getAttributes().getMtime())
- .withParentResourceId(parentResourceId)
- .withParentResourceType("SCP");
+ .withUpdateTime(rri.getAttributes().getMtime());
dirMetadataBuilder = dirMetadataBuilder.withFile(childFileBuilder.build());
}
}
dirMetadataBuilder = dirMetadataBuilder.withFriendlyName(new File(scpResource.getDirectory().getResourcePath()).getName())
- .withResourcePath(parentResourceId)
+ .withResourcePath(scpResource.getDirectory().getResourcePath())
.withCreatedTime(lsStat.getAtime())
- .withUpdateTime(lsStat.getMtime())
- .withParentResourceId(parentResourceId)
- .withParentResourceType("SCP");
+ .withUpdateTime(lsStat.getMtime());
return dirMetadataBuilder.build();
}
}
@@ -206,24 +199,23 @@
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- return getDirectoryResourceMetadata(scpPResource, scpSecret, resourceId);
+ return getDirectoryResourceMetadata(scpPResource, scpSecret);
}
@Override
- public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+ public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- SCPResource parentSCPPResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(parentResourceId).build());
+ SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- validateParent(parentSCPPResource, resourcePath);
- SCPResource scpResource = SCPResource.newBuilder().setScpStorage(parentSCPPResource.getScpStorage())
+ SCPResource scpResource = SCPResource.newBuilder().setScpStorage(scpStorage)
.setDirectory(DirectoryResource.newBuilder()
.setResourcePath(resourcePath).build()).build();
- return getDirectoryResourceMetadata(scpResource, scpSecret, parentResourceId);
+ return getDirectoryResourceMetadata(scpResource, scpSecret);
}
@Override
@@ -233,6 +225,23 @@
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
SCPResource scpResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ return isAvailable(scpResource, credentialToken);
+ }
+
+ @Override
+ public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+ checkInitialized();
+ ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+ SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
+ SCPResource scpResource = SCPResource.newBuilder()
+ .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+ .setScpStorage(scpStorage).build();
+
+ return isAvailable(scpResource, credentialToken);
+ }
+
+ public Boolean isAvailable(SCPResource scpResource, String credentialToken) throws Exception {
+
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
index 127ea8c..516ff7d 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
@@ -21,14 +21,13 @@
import com.jcraft.jsch.Session;
import org.apache.airavata.mft.core.ConnectorContext;
import org.apache.airavata.mft.core.DoubleStreamingBuffer;
-import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResource;
-import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
@@ -43,30 +42,37 @@
boolean initialized = false;
private Session session;
- private SCPResource scpResource;
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
String secretServiceHost, int secretServicePort) throws Exception {
+ if (initialized) {
+ destroy();
+ }
+
this.initialized = true;
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.scpResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
this.session = SCPTransportUtil.createSession(
- scpResource.getScpStorage().getUser(),
- scpResource.getScpStorage().getHost(),
- scpResource.getScpStorage().getPort(),
+ scpStorage.getUser(),
+ scpStorage.getHost(),
+ scpStorage.getPort(),
scpSecret.getPrivateKey().getBytes(),
scpSecret.getPublicKey().getBytes(),
scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
}
public void destroy() {
-
+ try {
+ this.session.disconnect();
+ } catch (Exception e) {
+ logger.error("Errored while disconnecting session", e);
+ }
}
private void checkInitialized() {
@@ -75,23 +81,16 @@
}
}
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
checkInitialized();
if (session == null) {
logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
}
- if (ResourceTypes.FILE.equals(this.scpResource.getResourceCase().name())) {
- transferRemoteToStream(session, this.scpResource.getFile().getResourcePath(), context.getStreamBuffer());
- logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
+ transferRemoteToStream(session, targetPath, context.getStreamBuffer());
+ logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.scpResource.getResourceId(), this.scpResource.getResourceCase().name());
- throw new Exception("Resource " + this.scpResource.getResourceId() + " should be a FILE type. Found a " +
- this.scpResource.getResourceCase().name());
- }
}
private void transferRemoteToStream(Session session, String from, DoubleStreamingBuffer streamBuffer) throws Exception {
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
index b552f0e..01b8ed4 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
@@ -30,6 +30,8 @@
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResource;
import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.slf4j.Logger;
@@ -44,27 +46,28 @@
boolean initialized = false;
private Session session;
- private SCPResource scpResource;
- public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+ public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
String secretServiceHost, int secretServicePort) throws Exception {
+ if (initialized) {
+ destroy();
+ }
+
this.initialized = true;
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- this.scpResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- logger.info("Creating a ssh session for {}@{}:{}",
- scpResource.getScpStorage().getUser(), scpResource.getScpStorage().getHost(),
- scpResource.getScpStorage().getPort());
+ logger.info("Creating a ssh session for {}@{}:{}", scpStorage.getUser(), scpStorage.getHost(), scpStorage.getPort());
this.session = SCPTransportUtil.createSession(
- scpResource.getScpStorage().getUser(),
- scpResource.getScpStorage().getHost(),
- scpResource.getScpStorage().getPort(),
+ scpStorage.getUser(),
+ scpStorage.getHost(),
+ scpStorage.getPort(),
scpSecret.getPrivateKey().getBytes(),
scpSecret.getPublicKey().getBytes(),
scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
@@ -72,11 +75,11 @@
public void destroy() {
- //try {
- // this.session.disconnect();
- //} catch (Exception e) {
- // logger.error("Errored while disconnecting session", e);
- //}
+ try {
+ this.session.disconnect();
+ } catch (Exception e) {
+ logger.error("Errored while disconnecting session", e);
+ }
}
private void checkInitialized() {
@@ -85,7 +88,7 @@
}
}
- public void startStream(ConnectorContext context) throws Exception {
+ public void startStream(String targetPath, ConnectorContext context) throws Exception {
checkInitialized();
if (session == null) {
@@ -93,16 +96,8 @@
throw new Exception("Session can not be null. Make sure that SCP Sender is properly initialized");
}
try {
- if (ResourceTypes.FILE.equals(this.scpResource.getResourceCase().name())) {
- copyLocalToRemote(this.session, this.scpResource.getFile().getResourcePath(), context.getStreamBuffer(), context.getMetadata().getResourceSize());
- logger.info("SCP send to transfer {} completed", context.getTransferId());
-
- } else {
- logger.error("Resource {} should be a FILE type. Found a {}",
- this.scpResource.getResourceId(), this.scpResource.getResourceCase().name());
- throw new Exception("Resource " + this.scpResource.getResourceId() + " should be a FILE type. Found a " +
- this.scpResource.getResourceCase().name());
- }
+ copyLocalToRemote(this.session, targetPath, context.getStreamBuffer(), context.getMetadata().getResourceSize());
+ logger.info("SCP send to transfer {} completed", context.getTransferId());
} catch (Exception e) {
logger.error("Errored while streaming to remote scp server. Transfer {}", context.getTransferId() , e);