blob: f6a4e49853c6181c40af2382f1079aca6a2f8a1d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.airavata.datalake.orchestrator.handlers.async;
import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.Notification;
import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationStatus;
import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationStatusRegisterRequest;
import org.apache.airavata.datalake.drms.resource.GenericResource;
import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
import org.apache.airavata.datalake.drms.storage.TransferMapping;
import org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.datalake.orchestrator.Utils;
import org.apache.airavata.datalake.orchestrator.connectors.CustosConnector;
import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
import org.apache.airavata.mft.api.client.MFTApiClient;
import org.apache.airavata.mft.api.service.DirectoryMetadataResponse;
import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
import org.apache.airavata.mft.api.service.FileMetadataResponse;
import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.common.DelegateAuth;
import org.apache.custos.iam.service.UserRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class OrchestratorEventProcessor implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(OrchestratorEventProcessor.class);
private final Notification notification;
private final DRMSConnector drmsConnector;
private final Configuration configuration;
private final WorkflowServiceConnector workflowServiceConnector;
private final Set<String> eventCache;
private final NotificationClient notificationClient;
private final CustosConnector custosConnector;
public OrchestratorEventProcessor(Configuration configuration, Notification notificationEvent,
Set<String> eventCache, NotificationClient notificationClient) throws Exception {
this.notification = notificationEvent;
this.eventCache = eventCache;
this.drmsConnector = new DRMSConnector(configuration);
this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
this.configuration = configuration;
this.notificationClient = notificationClient;
this.custosConnector = new CustosConnector(configuration);
}
private List<GenericResource> createResourceWithParentDirectories(String hostName, String storageId, String basePath,
String resourcePath, String resourceType, String user,
Map<String, GenericResource> resourceCache)
throws Exception {
List<GenericResource> resourceList = new ArrayList<>();
String parentType = "Storage";
String[] splitted = resourcePath.substring(basePath.length()).split("/");
String currentPath = basePath.endsWith("/") ? basePath.substring(0, basePath.length() - 1) : basePath;
String parentId = storageId;
for (int i = 0; i < splitted.length - 1; i++) {
String resourceName = splitted[i];
currentPath = currentPath + "/" + resourceName;
/*if (resourceCache.containsKey(currentPath)) {
resourceList.add(resourceCache.get(currentPath));
parentId = resourceCache.get(currentPath).getResourceId();
logger.info("Using cached resource with path {} for path {}", currentPath,
resourceCache.get(currentPath).getResourcePath());
continue;
}*/
String resourceId = Utils.getId(storageId + ":" + currentPath);
Optional<GenericResource> optionalGenericResource =
this.drmsConnector.createResource(notification.getAuthToken(),
notification.getTenantId(),
resourceId, resourceName, currentPath, parentId, "COLLECTION", parentType, user);
if (optionalGenericResource.isPresent()) {
parentId = optionalGenericResource.get().getResourceId();
parentType = "COLLECTION";
Map<String, String> metadata = new HashMap<>();
metadata.put("resourcePath", currentPath);
metadata.put("hostName", hostName);
this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
notification.getTenantId(), parentId, user, parentType, metadata);
resourceCache.put(currentPath, optionalGenericResource.get());
resourceList.add(optionalGenericResource.get());
} else {
logger.error("Could not create a resource for path {}", currentPath);
throw new Exception("Could not create a resource for path " + currentPath);
}
}
currentPath = currentPath + "/" + splitted[splitted.length - 1];
Optional<GenericResource> optionalGenericResource =
this.drmsConnector.createResource(notification.getAuthToken(),
notification.getTenantId(),
Utils.getId(storageId + ":" + currentPath),
splitted[splitted.length - 1], currentPath,
parentId, resourceType, parentType, user);
if (optionalGenericResource.isPresent()) {
GenericResource genericResource = optionalGenericResource.get();
Map<String, String> metadata = new HashMap<>();
metadata.put("resourcePath", currentPath);
metadata.put("hostName", hostName);
this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
notification.getTenantId(), genericResource.getResourceId(), user, resourceType, metadata);
resourceList.add(genericResource);
} else {
logger.error("Could not create a resource for path {}", currentPath);
throw new Exception("Could not create a resource for path " + currentPath);
}
return resourceList;
}
private void shareResourcesWithUsers(List<GenericResource> resourceList, String admin, String user, String permission) throws Exception {
for (GenericResource resource : resourceList) {
logger.info("Sharing resource {} with path {} with user {}",
resource.getResourceId(), resource.getResourcePath(), user);
this.drmsConnector.shareWithUser(notification.getAuthToken(), notification.getTenantId(),
admin, user, resource.getResourceId(), permission);
}
}
private void shareResourcesWithGroups(List<GenericResource> resourceList, String admin, String group, String permission) throws Exception {
for (GenericResource resource : resourceList) {
logger.info("Sharing resource {} with path {} with group {}",
resource.getResourceId(), resource.getResourcePath(), group);
this.drmsConnector.shareWithGroup(notification.getAuthToken(), notification.getTenantId(),
admin, group, resource.getResourceId(), permission);
}
}
private String verifyUser(String userName) throws Exception {
if (custosConnector.findUserByUserName(userName).isEmpty()) {
Optional<UserRepresentation> userByEmail = custosConnector.findUserByEmail(userName);
if (userByEmail.isPresent()) {
return userByEmail.get().getUsername();
} else {
logger.error("No user {} by email or user name", userName);
throw new Exception("Could not find the user " + userName);
}
} else {
return userName;
}
}
@Override
public void run() {
logger.info("Processing resource path {} on storage {}", notification.getResourcePath(),
notification.getBasePath());
long start = System.currentTimeMillis();
Map<String, GenericResource> resourceCache = new HashMap<>();
try {
this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
.setStatus(NotificationStatus.newBuilder()
.setStatusId(UUID.randomUUID().toString())
.setNotificationId(notification.getNotificationId())
.setStatus(NotificationStatus.StatusType.DATA_ORCH_RECEIVED)
.setDescription("Notification Received")
.setPublishedTime(System.currentTimeMillis())
.build()).build());
if (!"FOLDER".equals(notification.getResourceType())) {
logger.error("Resource {} should be a Folder type but got {}",
notification.getResourcePath(),
notification.getResourceType());
logger.error("Resource should be a Folder type");
}
String removeBasePath = notification.getResourcePath().substring(notification.getBasePath().length());
String[] splitted = removeBasePath.split("/");
if (splitted.length < 2) {
logger.error("Invalid path. Need at least two folder levels from base. {}", removeBasePath);
throw new Exception("Invalid path. Need at least two folder levels from base");
}
String adminUser = verifyUser(splitted[0]);
String owner = verifyUser(splitted[1].split("_")[0]);
Map<String, String> ownerRules = new HashMap<>();
ownerRules.put(adminUser, "VIEWER");
ownerRules.put(splitted[1], "OWNER");
Optional<TransferMapping> optionalTransferMapping = drmsConnector.getActiveTransferMapping(
notification.getAuthToken(),
notification.getTenantId(), adminUser,
notification.getHostName());
if (optionalTransferMapping.isEmpty()) {
logger.error("Could not find a transfer mapping for user {} and host {}", adminUser, notification.getHostName());
throw new Exception("Could not find a transfer mapping");
}
TransferMapping transferMapping = optionalTransferMapping.get();
String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
String sourceHostName = transferMapping.getSourceStorage().getSshStorage().getHostName();
String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
String destinationHostName = transferMapping.getDestinationStorage().getSshStorage().getHostName();
// Creating parent resource
List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId,
notification.getBasePath(),
notification.getResourcePath(),
"COLLECTION", adminUser, resourceCache);
shareResourcesWithUsers(Collections.singletonList(resourceList.get(resourceList.size() - 1)),
adminUser, owner, "VIEWER");
shareResourcesWithGroups(Collections.singletonList(resourceList.get(0)), adminUser,
configuration.getTenantConfigs().getAdminGroup(),
"EDITOR");
GenericResource resourceObj = resourceList.get(resourceList.size() - 1);
Optional<AnyStoragePreference> sourceSPOp = this.drmsConnector.getStoragePreference(
notification.getAuthToken(), adminUser,
notification.getTenantId(), sourceStorageId);
if (sourceSPOp.isEmpty()) {
logger.error("No storage preference found for source storage {} and user {}", sourceStorageId, adminUser);
throw new Exception("No storage preference found for source storage");
}
Optional<AnyStoragePreference> destSPOp = this.drmsConnector.getStoragePreference(
notification.getAuthToken(), adminUser,
notification.getTenantId(), destinationStorageId);
if (destSPOp.isEmpty()) {
logger.error("No storage preference found for destination storage {} and user {}", sourceStorageId, adminUser);
throw new Exception("No storage preference found for destination storage");
}
AnyStoragePreference sourceSP = sourceSPOp.get();
AnyStoragePreference destSP = destSPOp.get();
String decodedAuth = new String(Base64.getDecoder().decode(notification.getAuthToken()));
String[] authParts = decodedAuth.split(":");
if (authParts.length != 2) {
throw new Exception("Could not decode auth token to work with MFT");
}
DelegateAuth delegateAuth = DelegateAuth.newBuilder()
.setUserId(adminUser)
.setClientId(authParts[0])
.setClientSecret(authParts[1])
.putProperties("TENANT_ID", notification.getTenantId()).build();
AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
List<String> resourceIDsToProcess = new ArrayList<>();
// Fetching file list for parent resource
scanResourceForChildResources(resourceObj, mftAuth, sourceSP, sourceStorageId, sourceHostName,
adminUser, resourceIDsToProcess, resourceCache, 4);
logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
resourceList = createResourceWithParentDirectories(destinationHostName, destinationStorageId, notification.getBasePath(),
notification.getResourcePath(), "FILE", adminUser, resourceCache);
GenericResource destinationResource = resourceList.get(resourceList.size() - 1);
logger.info("Submitting resources to workflow manager");
this.workflowServiceConnector.invokeWorkflow(notification.getAuthToken(), adminUser,
notification.getTenantId(), resourceIDsToProcess, sourceSP.getSshStoragePreference().getStoragePreferenceId(),
destinationResource.getResourceId(), destSP.getSshStoragePreference().getStoragePreferenceId());
this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
.setStatus(NotificationStatus.newBuilder()
.setStatusId(UUID.randomUUID().toString())
.setNotificationId(notification.getNotificationId())
.setStatus(NotificationStatus.StatusType.DISPATCHED_TO_WORFLOW_ENGING)
.setDescription("Notification successfully processed at the orchestrator. " +
"Sending to workflow manager")
.setPublishedTime(System.currentTimeMillis())
.build()).build());
logger.info("Completed processing path {}. Time taken {} ms",
notification.getResourcePath(), System.currentTimeMillis() - start);
} catch (Exception e) {
logger.error("Failed to process event for resource path {}", notification.getResourcePath(), e);
this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
.setStatus(NotificationStatus.newBuilder()
.setStatusId(UUID.randomUUID().toString())
.setNotificationId(notification.getNotificationId())
.setStatus(NotificationStatus.StatusType.ERRORED)
.setDescription("Notification failed due to : " + e.getMessage())
.setPublishedTime(System.currentTimeMillis())
.build()).build());
} finally {
this.eventCache.remove(notification.getResourcePath() + ":" + notification.getHostName());
}
}
private void scanResourceForChildResources(GenericResource resourceObj, AuthToken mftAuth, AnyStoragePreference sourceSP,
String sourceStorageId, String sourceHostName, String adminUser,
List<String> resourceIDsToProcess, Map<String, GenericResource> resourceCache,
int scanDepth)
throws Exception {
FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
.setMftAuthorizationToken(mftAuth)
.setResourceId(resourceObj.getResourceId());
switch (sourceSP.getStorageCase()) {
case SSH_STORAGE_PREFERENCE:
resourceMetadataReq.setResourceType("SCP");
resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
break;
case S3_STORAGE_PREFERENCE:
resourceMetadataReq.setResourceType("S3");
resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
break;
}
DirectoryMetadataResponse directoryResourceMetadata;
try (MFTApiClient mftApiClient = new MFTApiClient(
this.configuration.getOutboundEventProcessor().getMftHost(),
this.configuration.getOutboundEventProcessor().getMftPort())) {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
directoryResourceMetadata = mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
} catch (Exception e) {
logger.error("Failed to fetch dir metadata for resource {} with path {}",
resourceObj.getResourceId(), resourceObj.getResourcePath(), e);
throw e;
}
for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
long start = System.currentTimeMillis();
List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
fileMetadata.getResourcePath(), "FILE", adminUser, resourceCache);
GenericResource fileResource = resourceList.get(resourceList.size() - 1);
resourceIDsToProcess.add(fileResource.getResourceId());
logger.info("Completed registering the file {} for source storage {}. Time taken {} ms",
fileMetadata.getResourcePath(), sourceStorageId, System.currentTimeMillis() - start);
}
for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
long start = System.currentTimeMillis();
List<GenericResource> createResources = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
directoryMetadata.getResourcePath(),
"COLLECTION", adminUser, resourceCache);
GenericResource dirResource = createResources.get(createResources.size() - 1);
if (scanDepth > 0) {
// Scanning the directories recursively
scanResourceForChildResources(dirResource, mftAuth, sourceSP, sourceStorageId, sourceHostName, adminUser,
resourceIDsToProcess, resourceCache, scanDepth - 1);
}
logger.info("Completed registering directory {} for source storage {}. Time taken {} ms",
directoryMetadata.getResourcePath(), sourceStorageId, sourceStorageId, System.currentTimeMillis() - start);
}
}
}