blob: 72306199d7ba06f00f0d3abec2517de611dee0a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.airavata.mft.api.handler;
import com.google.protobuf.util.JsonFormat;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.SyncRPCClient;
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
import org.apache.airavata.mft.agent.stub.*;
import org.apache.airavata.mft.api.service.*;
import org.apache.airavata.mft.credential.stubs.azure.AzureSecret;
import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
import org.apache.airavata.mft.credential.stubs.box.BoxSecret;
import org.apache.airavata.mft.credential.stubs.box.BoxSecretGetRequest;
import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecret;
import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
import org.apache.airavata.mft.credential.stubs.odata.ODataSecret;
import org.apache.airavata.mft.credential.stubs.odata.ODataSecretGetRequest;
import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
import org.apache.airavata.mft.credential.stubs.swift.SwiftSecret;
import org.apache.airavata.mft.credential.stubs.swift.SwiftSecretGetRequest;
import org.apache.airavata.mft.resource.client.StorageServiceClient;
import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorageGetRequest;
import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorage;
import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorageGetRequest;
import org.apache.airavata.mft.resource.stubs.dropbox.storage.DropboxStorage;
import org.apache.airavata.mft.resource.stubs.dropbox.storage.DropboxStorageGetRequest;
import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorageGetRequest;
import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorageGetRequest;
import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorage;
import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorageGetRequest;
import org.apache.airavata.mft.resource.stubs.odata.storage.ODataStorage;
import org.apache.airavata.mft.resource.stubs.odata.storage.ODataStorageGetRequest;
import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
import org.apache.airavata.mft.resource.stubs.swift.storage.SwiftStorage;
import org.apache.airavata.mft.resource.stubs.swift.storage.SwiftStorageGetRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.dozer.DozerBeanMapper;
import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@GRpcService
public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(MFTApiHandler.class);
@Autowired
private MFTConsulClient mftConsulClient;
@Autowired
private DozerBeanMapper dozerBeanMapper;
@Autowired
private SyncRPCClient agentRPCClient;
@org.springframework.beans.factory.annotation.Value("${resource.service.host}")
private String resourceServiceHost;
@org.springframework.beans.factory.annotation.Value("${resource.service.port}")
private int resourceServicePort;
@org.springframework.beans.factory.annotation.Value("${secret.service.host}")
private String secretServiceHost;
@org.springframework.beans.factory.annotation.Value("${secret.service.port}")
private int secretServicePort;
@Override
public void submitTransfer(TransferApiRequest request, StreamObserver<TransferApiResponse> responseObserver) {
try {
String transferId = mftConsulClient.submitTransfer(request);
logger.info("Submitted the transfer request {}", transferId);
mftConsulClient.saveTransferState(transferId, null, new TransferState()
.setUpdateTimeMils(System.currentTimeMillis())
.setState("RECEIVED").setPercentage(0)
.setPublisher("api")
.setDescription("Received transfer job " + transferId));
responseObserver.onNext(TransferApiResponse.newBuilder().setTransferId(transferId).build());
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error in submitting transfer request", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to submit transfer request. " + e.getMessage())
.asException());
}
}
@Override
public void submitHttpUpload(HttpUploadApiRequest request, StreamObserver<HttpUploadApiResponse> responseObserver) {
super.submitHttpUpload(request, responseObserver);
}
@Override
public void submitHttpDownload(HttpDownloadApiRequest request, StreamObserver<HttpDownloadApiResponse> responseObserver) {
try {
// TODO : Automatically derive agent if the target agent is empty
logger.info("Processing submit http download for resource path {}", request.getResourcePath());
String targetAgent = derriveTargetAgent(request.getTargetAgent());
SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
.withAgentId(targetAgent)
.withMessageId(UUID.randomUUID().toString())
.withMethod("submitHttpDownload")
.withParameter("resourcePath", request.getResourcePath())
.withParameter("sourceStorageId", request.getSourceStorageId())
.withParameter("sourceToken", request.getSourceSecretId())
.withParameter("mftAuthorizationToken", JsonFormat.printer().print(request.getMftAuthorizationToken()));
SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(requestBuilder.build());
switch (rpcResponse.getResponseStatus()) {
case SUCCESS:
String url = rpcResponse.getResponseAsStr();
HttpDownloadApiResponse downloadResponse = HttpDownloadApiResponse.newBuilder()
.setUrl(url)
.setTargetAgent(request.getTargetAgent()).build();
responseObserver.onNext(downloadResponse);
responseObserver.onCompleted();
return;
case FAIL:
logger.error("Errored while processing the download request to resource path {}. Error msg : {}",
request.getResourcePath(), rpcResponse.getErrorAsStr());
responseObserver.onError(Status.INTERNAL
.withDescription("Errored while processing the the fetch file metadata response. Error msg : " +
rpcResponse.getErrorAsStr())
.asException());
}
} catch (Exception e) {
logger.error("Error while submitting http download request to resource path {}",
request.getResourcePath() , e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to submit http download request. " + e.getMessage())
.asException());
}
}
@Override
public void getAllTransferStates(TransferStateApiRequest request, StreamObserver<TransferStateResponse> responseObserver) {
try {
List<TransferState> states = mftConsulClient.getTransferStates(request.getTransferId());
states.forEach(st -> {
TransferStateResponse s = dozerBeanMapper.map(st, TransferStateResponse.newBuilder().getClass()).build();
responseObserver.onNext(s);
});
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Error in fetching transfer states", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to retrieve transfer states. " + e.getMessage())
.asException());
}
}
@Override
public void getTransferStateSummary(TransferStateApiRequest request, StreamObserver<TransferStateSummaryResponse> responseObserver) {
try {
TransferStateSummaryResponse.Builder stateBuilder = TransferStateSummaryResponse.newBuilder().setPercentage(0);
List<TransferState> transferStates = mftConsulClient.getTransferStates(request.getTransferId());
List<TransferState> mainTransferStatus = transferStates.stream().filter(st -> st.getChildId() == null)
.sorted((st1, st2) -> Long.compare(st2.getUpdateTimeMils(), st1.getUpdateTimeMils()))
.collect(Collectors.toList());
Optional<TransferApiRequest> processedTransferOp = mftConsulClient.getProcessedTransfer(request.getTransferId());
if (processedTransferOp.isPresent()) {
Set<String> completedFiles = new HashSet<>();
Set<String> failedFiles = new HashSet<>();
transferStates.stream().filter(st -> st.getChildId() != null).forEach(st -> {
if (st.getState().equals("COMPLETED")) {
completedFiles.add(st.getChildId());
} else if (st.getState().equals("FAILED")) {
failedFiles.add("FAILED");
}
});
Set<String> pendingFiles = processedTransferOp.get().getEndpointPathsList()
.stream().map(ep -> mftConsulClient.getEndpointPathHash(ep))
.filter(key -> !completedFiles.contains(key) && !failedFiles.contains(key)).collect(Collectors.toSet());
stateBuilder.addAllCompleted(completedFiles);
stateBuilder.addAllFailed(failedFiles);
stateBuilder.addAllProcessing(pendingFiles);
if (!pendingFiles.isEmpty()) {
stateBuilder.setState("IN PROGRESS");
stateBuilder.setPercentage((completedFiles.size() + failedFiles.size()) * 1.0 /
(completedFiles.size() + failedFiles.size() + pendingFiles.size()));
stateBuilder.setDescription("Transfer is in progress");
} else {
if (!failedFiles.isEmpty() && !completedFiles.isEmpty()) {
stateBuilder.setState("PARTIAL FAILURE");
stateBuilder.setDescription("Some file transfers failed");
stateBuilder.setPercentage((completedFiles.size() + failedFiles.size()) * 1.0 /
(completedFiles.size() + failedFiles.size() + pendingFiles.size()));
} else if (!failedFiles.isEmpty()) {
stateBuilder.setState("FAILED");
stateBuilder.setDescription("All file transfers failed");
stateBuilder.setPercentage((completedFiles.size() + failedFiles.size()) * 1.0 /
(completedFiles.size() + failedFiles.size() + pendingFiles.size()));
} else if (!completedFiles.isEmpty()) {
stateBuilder.setState("COMPLETED");
stateBuilder.setState("All file transfers completed");
stateBuilder.setPercentage((completedFiles.size() + failedFiles.size()) * 1.0 /
(completedFiles.size() + failedFiles.size() + pendingFiles.size()));
}
}
responseObserver.onNext(stateBuilder.build());
responseObserver.onCompleted();
} else if (!mainTransferStatus.isEmpty()){
stateBuilder.setState(mainTransferStatus.get(0).getState());
stateBuilder.setPercentage(0);
stateBuilder.setDescription(mainTransferStatus.get(0).getDescription());
responseObserver.onNext(stateBuilder.build());
responseObserver.onCompleted();
} else {
logger.error("There is processed transfer with id {}", request.getTransferId());
responseObserver.onError(Status.NOT_FOUND
.withDescription("There is no processed transfer with id " + request.getTransferId())
.asRuntimeException());
}
} catch (Exception e) {
logger.error("Error in fetching transfer state", e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to retrieve transfer state. " + e.getMessage())
.asException());
}
}
private GetResourceMetadataRequest deriveDirectRequest(GetResourceMetadataFromIDsRequest idRequest) {
StorageServiceClient storageClient = StorageServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
StorageTypeResolveResponse storageTypeResp = storageClient.common().resolveStorageType(
StorageTypeResolveRequest.newBuilder().setStorageId(idRequest.getStorageId()).build());
GetResourceMetadataRequest.Builder directReqBuilder = GetResourceMetadataRequest.newBuilder();
directReqBuilder.setResourcePath(idRequest.getResourcePath());
directReqBuilder.setRecursiveSearch(idRequest.getRecursiveSearch());
switch (storageTypeResp.getStorageType()) {
case S3:
S3Storage s3Storage = storageClient.s3()
.getS3Storage(S3StorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
S3Secret s3Secret = secretClient.s3()
.getS3Secret(S3SecretGetRequest.newBuilder()
.setSecretId(idRequest.getSecretId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setS3(s3Storage).build())
.setSecret(SecretWrapper.newBuilder().setS3(s3Secret).build());
break;
case FTP:
FTPStorage ftpStorage = storageClient.ftp()
.getFTPStorage(FTPStorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
FTPSecret ftpSecret = secretClient.ftp()
.getFTPSecret(FTPSecretGetRequest.newBuilder()
.setSecretId(idRequest.getSecretId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setFtp(ftpStorage).build())
.setSecret(SecretWrapper.newBuilder().setFtp(ftpSecret).build());
break;
case LOCAL:
LocalStorage localStorage = storageClient.local()
.getLocalStorage(LocalStorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setLocal(localStorage).build());
break;
case BOX:
BoxStorage boxStorage = storageClient.box()
.getBoxStorage(BoxStorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
BoxSecret boxSecret = secretClient.box()
.getBoxSecret(BoxSecretGetRequest.newBuilder()
.setSecretId(idRequest.getSecretId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setBox(boxStorage).build())
.setSecret(SecretWrapper.newBuilder().setBox(boxSecret).build());
break;
case DROPBOX:
DropboxStorage dropBoxStorage = storageClient.dropbox()
.getDropboxStorage(DropboxStorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
DropboxSecret dropBoxSecret = secretClient.dropbox()
.getDropboxSecret(DropboxSecretGetRequest.newBuilder()
.setSecretId(idRequest.getSecretId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setDropbox(dropBoxStorage).build())
.setSecret(SecretWrapper.newBuilder().setDropbox(dropBoxSecret).build());
break;
case GCS:
GCSStorage gcsStorage = storageClient.gcs()
.getGCSStorage(GCSStorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
GCSSecret gcsSecret = secretClient.gcs()
.getGCSSecret(GCSSecretGetRequest.newBuilder()
.setSecretId(idRequest.getSecretId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setGcs(gcsStorage).build())
.setSecret(SecretWrapper.newBuilder().setGcs(gcsSecret).build());
break;
case AZURE:
AzureStorage azureStorage = storageClient.azure()
.getAzureStorage(AzureStorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
AzureSecret azureSecret = secretClient.azure()
.getAzureSecret(AzureSecretGetRequest.newBuilder()
.setSecretId(idRequest.getSecretId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setAzure(azureStorage).build())
.setSecret(SecretWrapper.newBuilder().setAzure(azureSecret).build());
break;
case SWIFT:
SwiftStorage swiftStorage = storageClient.swift()
.getSwiftStorage(SwiftStorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
SwiftSecret swiftSecret = secretClient.swift()
.getSwiftSecret(SwiftSecretGetRequest.newBuilder()
.setSecretId(idRequest.getSecretId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setSwift(swiftStorage).build())
.setSecret(SecretWrapper.newBuilder().setSwift(swiftSecret).build());
break;
case ODATA:
ODataStorage odataStorage = storageClient.odata()
.getODataStorage(ODataStorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
ODataSecret odataSecret = secretClient.odata()
.getODataSecret(ODataSecretGetRequest.newBuilder()
.setSecretId(idRequest.getSecretId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setOdata(odataStorage).build())
.setSecret(SecretWrapper.newBuilder().setOdata(odataSecret).build());
break;
case SCP:
SCPStorage scpStorage = storageClient.scp()
.getSCPStorage(SCPStorageGetRequest.newBuilder()
.setStorageId(idRequest.getStorageId()).build());
SCPSecret scpSecret = secretClient.scp()
.getSCPSecret(SCPSecretGetRequest.newBuilder()
.setSecretId(idRequest.getSecretId()).build());
directReqBuilder
.setStorage(StorageWrapper.newBuilder().setScp(scpStorage).build())
.setSecret(SecretWrapper.newBuilder().setScp(scpSecret).build());
break;
}
return directReqBuilder.build();
}
@Override
public void getResourceAvailability(FetchResourceMetadataRequest request, StreamObserver<ResourceAvailabilityResponse> responseObserver) {
GetResourceMetadataRequest directRequest = null;
try {
if (request.getRequestCase() == FetchResourceMetadataRequest.RequestCase.DIRECTREQUEST) {
directRequest = request.getDirectRequest();
} else {
directRequest = deriveDirectRequest(request.getIdRequest());
}
String targetAgent = derriveTargetAgent("");
SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
.withAgentId(targetAgent)
.withMessageId(UUID.randomUUID().toString())
.withParameter("request", JsonFormat.printer().print(directRequest));
requestBuilder.withMethod("getResourceAvailability");
SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(requestBuilder.build());
switch (rpcResponse.getResponseStatus()) {
case SUCCESS:
boolean resourceAvailable = Boolean.parseBoolean(rpcResponse.getResponseAsStr());
responseObserver.onNext(ResourceAvailabilityResponse.newBuilder().setAvailable(resourceAvailable).build());
responseObserver.onCompleted();
return;
case FAIL:
logger.error("Errored while processing the fetch metadata response for resource path {}. Error msg : {}",
directRequest.getResourcePath(), rpcResponse.getErrorAsStr());
responseObserver.onError(Status.INTERNAL
.withDescription("Errored while processing the the fetch file metadata response. Error msg : " +
rpcResponse.getErrorAsStr())
.asException());
}
} catch (Exception e) {
logger.error("Error while fetching resource metadata for resource path " + directRequest.getResourcePath(), e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to fetch file resource metadata. " + e.getMessage())
.asException());
}
}
@Override
public void resourceMetadata(FetchResourceMetadataRequest request, StreamObserver<ResourceMetadata> responseObserver) {
GetResourceMetadataRequest directRequest = null;
try {
if (request.getRequestCase() == FetchResourceMetadataRequest.RequestCase.DIRECTREQUEST) {
directRequest = request.getDirectRequest();
} else {
directRequest = deriveDirectRequest(request.getIdRequest());
}
String targetAgent = derriveTargetAgent("");
SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
.withAgentId(targetAgent)
.withMessageId(UUID.randomUUID().toString())
.withParameter("request", JsonFormat.printer().print(directRequest));
requestBuilder.withMethod("getResourceMetadata");
SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(requestBuilder.build());
switch (rpcResponse.getResponseStatus()) {
case SUCCESS:
ResourceMetadata.Builder resourceMetadataBuilder = ResourceMetadata.newBuilder();
JsonFormat.parser().merge(rpcResponse.getResponseAsStr(), resourceMetadataBuilder);
responseObserver.onNext(resourceMetadataBuilder.build());
responseObserver.onCompleted();
return;
case FAIL:
logger.error("Errored while processing the fetch metadata response for resource path {}. Error msg : {}",
directRequest.getResourcePath(), rpcResponse.getErrorAsStr());
responseObserver.onError(Status.INTERNAL
.withDescription("Errored while processing the the fetch file metadata response. Error msg : " +
rpcResponse.getErrorAsStr())
.asException());
}
} catch (Exception e) {
logger.error("Error while fetching resource metadata for resource path " + directRequest.getResourcePath(), e);
responseObserver.onError(Status.INTERNAL
.withDescription("Failed to fetch file resource metadata. " + e.getMessage())
.asException());
}
}
private String derriveTargetAgent(String targetAgent) throws Exception {
if (targetAgent.isEmpty()) {
List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
if (liveAgentIds.isEmpty()) {
throw new Exception("No agent is available to perform the operation");
}
targetAgent = liveAgentIds.get(0);
logger.info("Using agent {} for processing the operation", targetAgent);
} else {
Optional<AgentInfo> agentInfo = mftConsulClient.getAgentInfo(targetAgent);
if (agentInfo.isEmpty()) {
throw new Exception("Target agent " + targetAgent + " is not available");
}
}
return targetAgent;
}
}