Merge pull request #208 from isururanawaka/service_monitoring
recording unverified resources, users, and invoke workflows when user…
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Constants.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Constants.java
new file mode 100644
index 0000000..c1f3d5d
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Constants.java
@@ -0,0 +1,8 @@
+package org.apache.airavata.datalake.orchestrator;
+
+public class Constants {
+
+ public static final String ERROR_CODE_INVALID_TYPE="ERR_0001";
+ public static final String ERROR_CODE_INVALID_USERNAME="ERR_0002";
+ public static final String ERROR_CODE_INVALID_PATH="ERR_0003";
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
index 7fcefc3..dde3708 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
@@ -61,24 +61,16 @@
LOGGER.info("Starting Data orchestrator API Server ...");
LOGGER.info("Loading configuration from file {} ...", configPath);
- Configuration configuration = this.loadConfig(configPath);
+ Configuration configuration = Utils.loadConfig(configPath);
LOGGER.info("Registering Orchestration even handler " + OrchestratorEventHandler.class.getName() + " ...");
orchestratorEventHandler.init(configuration);
+
LOGGER.info("Data orchestrator start accepting events ....");
orchestratorEventHandler.startProcessing();
}
- private Configuration loadConfig(String filePath) {
- LOGGER.info("File path " + filePath);
- try (InputStream in = new FileInputStream(filePath)) {
- Yaml yaml = new Yaml();
- return yaml.loadAs(in, Configuration.class);
- } catch (Exception exception) {
- LOGGER.error("Error loading config file", exception);
- }
- return null;
- }
+
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
index 420c042..79fbb45 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
@@ -1,5 +1,11 @@
package org.apache.airavata.datalake.orchestrator;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
@@ -26,4 +32,11 @@
return hexString.toString();
}
+
+ public static Configuration loadConfig(String filePath) throws IOException {
+ try (InputStream in = new FileInputStream(filePath)) {
+ Yaml yaml = new Yaml();
+ return yaml.loadAs(in, Configuration.class);
+ }
+ }
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index f943449..1f10703 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -194,6 +194,60 @@
}
}
+ public Optional<GenericResource> createUnverifiedResource(String authToken,
+ String tenantId,
+ String resourceId,
+ String resourcePath,
+ String type,
+ String errorCode,
+ String errorDiscription,
+ String unverifiedUser) throws Exception {
+
+ DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(authToken)
+ .setUserUnverified(true)
+ .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setTenantId(tenantId)
+ .build())
+ .build();
+ if (unverifiedUser != null){
+ serviceAuthToken = serviceAuthToken
+ .toBuilder()
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setTenantId(tenantId)
+ .setUsername(unverifiedUser)
+ .build()).build();
+ }
+
+ GenericResource genericResource = GenericResource
+ .newBuilder()
+ .setResourceId(resourceId)
+ .setResourcePath(resourcePath)
+ .setType(type)
+ .putProperties("ERROR_CODE", errorCode)
+ .putProperties("ERROR_DISCRIPTION", errorDiscription)
+ .build();
+ ResourceCreateRequest resourceCreateRequest = ResourceCreateRequest
+ .newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .setResource(genericResource)
+ .build();
+
+ try {
+ ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createUnverifiedResource(resourceCreateRequest);
+ return Optional.ofNullable(resourceCreateResponse.getResource());
+ } catch (Exception ex) {
+ LOGGER.error("Error occurred while creating unverified resource {} in DRMS", resourcePath, ex);
+ return Optional.empty();
+ }
+ }
+
+
+
+
+
+
public void addResourceMetadata(String authToken,
String tenantId,
String resourceId,
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index 9b056a5..074d216 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -20,6 +20,7 @@
import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.Notification;
import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationRegisterRequest;
import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationEntity;
import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
import org.dozer.DozerBeanMapper;
@@ -94,6 +95,23 @@
}));
}
+
+
+ public void invokeMessageFlowForNotification(Notification notification) throws Exception {
+ try {
+ if (!eventCache.contains(notification.getResourcePath() + ":" + notification.getHostName())) {
+ eventCache.add(notification.getResourcePath() + ":" + notification.getHostName());
+ this.executorService.submit(new OrchestratorEventProcessor(
+ configuration, notification, eventCache, notificationClient));
+ }
+ }catch (Exception e) {
+ LOGGER.error("Failed to submit data orchestrator event to process on path {}",
+ notification.getResourcePath(), e);
+ }
+ }
+
+
+
public Configuration getConfiguration() {
return configuration;
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index f6a4e49..e8e0546 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -24,6 +24,7 @@
import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
import org.apache.airavata.datalake.drms.storage.TransferMapping;
import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Constants;
import org.apache.airavata.datalake.orchestrator.Utils;
import org.apache.airavata.datalake.orchestrator.connectors.CustosConnector;
import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
@@ -160,17 +161,16 @@
}
}
- private String verifyUser(String userName) throws Exception {
+ private Optional<String> verifyUser(String userName) throws Exception {
if (custosConnector.findUserByUserName(userName).isEmpty()) {
Optional<UserRepresentation> userByEmail = custosConnector.findUserByEmail(userName);
if (userByEmail.isPresent()) {
- return userByEmail.get().getUsername();
+ return Optional.of(userByEmail.get().getUsername());
} else {
- logger.error("No user {} by email or user name", userName);
- throw new Exception("Could not find the user " + userName);
+ return Optional.empty();
}
} else {
- return userName;
+ return Optional.of(userName);
}
}
@@ -198,6 +198,11 @@
notification.getResourcePath(),
notification.getResourceType());
logger.error("Resource should be a Folder type");
+ this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
+ notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
+ Constants.ERROR_CODE_INVALID_TYPE, " Invalid type :" + notification.getResourceType(), null);
+ throw new Exception("Resource should be a Folder type");
+
}
String removeBasePath = notification.getResourcePath().substring(notification.getBasePath().length());
@@ -205,11 +210,31 @@
if (splitted.length < 2) {
logger.error("Invalid path. Need at least two folder levels from base. {}", removeBasePath);
+ this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
+ notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
+ Constants.ERROR_CODE_INVALID_PATH, " Invalid path ", null);
throw new Exception("Invalid path. Need at least two folder levels from base");
}
+ String adminUser = null;
+ String owner = null;
- String adminUser = verifyUser(splitted[0]);
- String owner = verifyUser(splitted[1].split("_")[0]);
+ Optional<String> adminUserOp = verifyUser(splitted[0]);
+ Optional<String> ownerOp = verifyUser(splitted[1].split("_")[0]);
+ if (adminUserOp.isEmpty()) {
+ this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
+ notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
+ Constants.ERROR_CODE_INVALID_USERNAME, " User not verified ", splitted[0]);
+ logger.error("Invalid user. User should be verified users {} . {}", splitted[0], removeBasePath);
+ throw new Exception("Invalid user. User " + splitted[0] + " should be registered users");
+ }
+ if (ownerOp.isEmpty()) {
+ this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
+ notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
+ Constants.ERROR_CODE_INVALID_USERNAME, " User not verified ", splitted[1]);
+ logger.error("Invalid user. User should be verified users {} . {}", splitted[1], removeBasePath);
+ throw new Exception("Invalid user. User " + splitted[1] + " should be registered users");
+ }
+
Map<String, String> ownerRules = new HashMap<>();
ownerRules.put(adminUser, "VIEWER");
@@ -393,4 +418,6 @@
directoryMetadata.getResourcePath(), sourceStorageId, sourceStorageId, System.currentTimeMillis() - start);
}
}
+
+
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
index 61fbbd1..d6269c7 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
@@ -17,29 +17,48 @@
package org.apache.airavata.datalake.orchestrator.handlers.grpc;
+import com.google.protobuf.Empty;
+import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.*;
+import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Utils;
+import org.apache.airavata.datalake.orchestrator.handlers.async.OrchestratorEventHandler;
import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationEntity;
import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationStatusEntity;
import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.NotificationEntityRepository;
import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.NotificationStatusEntityRepository;
import org.dozer.DozerBeanMapper;
import org.lognet.springboot.grpc.GRpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Stream;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
@GRpcService
public class NotificationApiHandler extends NotificationServiceGrpc.NotificationServiceImplBase {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OrchestratorEventHandler.class);
+
@Autowired
private NotificationEntityRepository notificationRepository;
@Autowired
private NotificationStatusEntityRepository notificationStatusRepository;
+
+ @Autowired
+ private OrchestratorEventHandler orchestratorEventHandler;
+
+ @org.springframework.beans.factory.annotation.Value("${config.path}")
+ private String configPath;
+
+
@Override
public void registerNotification(NotificationRegisterRequest request, StreamObserver<NotificationRegisterResponse> responseObserver) {
DozerBeanMapper mapper = new DozerBeanMapper();
@@ -99,4 +118,40 @@
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
+
+
+ @Override
+ public void invokeNotification(NotificationInvokeRequest request, StreamObserver<NotificationInvokeResponse> responseObserver) {
+ try {
+ Optional<NotificationEntity> notificationOP = notificationRepository.findById(request.getNotificationId());
+
+ if (notificationOP.isPresent()) {
+ NotificationEntity notificationEntity = notificationOP.get();
+
+
+ Notification notification = Notification
+ .newBuilder()
+ .setNotificationId(UUID.randomUUID().toString())
+ .setBasePath(notificationEntity.getBasePath())
+ .setResourcePath(notificationEntity.getResourcePath())
+ .setAuthToken(notificationEntity.getAuthToken())
+ .setEventType(Notification.NotificationType.valueOf(notificationEntity.getResourceType()))
+ .setHostName(notificationEntity.getHostName())
+ .setOccuredTime(notificationEntity.getOccuredTime())
+ .setTenantId(notificationEntity.getTenantId())
+ .setResourceType(notificationEntity.getResourceType()).build();
+
+ Configuration configuration = Utils.loadConfig(configPath);
+ orchestratorEventHandler.init(configuration);
+ orchestratorEventHandler.invokeMessageFlowForNotification(notification);
+ responseObserver.onNext(NotificationInvokeResponse.newBuilder().setStatus(true).build());
+ responseObserver.onCompleted();
+ }
+ } catch (Exception ex) {
+ String msg = "Notification invocation failed for id " + request.getNotificationId();
+ LOGGER.error(msg, ex);
+ responseObserver.onError(Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+
+ }
}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/resources/application.properties b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/resources/application.properties
index 88dd260..606cc01 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/resources/application.properties
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/resources/application.properties
@@ -17,4 +17,4 @@
mft.host=localhost
mft.port=6565
drms.host=localhost
-drms.port=7070
\ No newline at end of file
+drms.port=7070
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
index 2a019fb..bef56ed 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
@@ -73,6 +73,14 @@
Notification notification = 1;
}
+message NotificationInvokeRequest {
+ string notificationId = 1;
+}
+
+message NotificationInvokeResponse {
+ bool status = 1;
+}
+
message NotificationRegisterResponse {
}
@@ -109,6 +117,12 @@
};
}
+ rpc invokeNotification (NotificationInvokeRequest) returns (NotificationInvokeResponse) {
+ option (google.api.http) = {
+ post: "/v1.0/api/dataorch/notification/invoke"
+ };
+ }
+
rpc listNotifications (NotificationListRequest) returns (NotificationListResponse) {
option (google.api.http) = {
get: "/v1.0/api/dataorch/notifications"
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/pom.xml b/data-resource-management-service/drms-rdbms-impl/drms-server/pom.xml
index c8b3080..66c2ab3 100644
--- a/data-resource-management-service/drms-rdbms-impl/drms-server/pom.xml
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/pom.xml
@@ -153,6 +153,17 @@
<artifactId>drms-core</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>data-orchestrator-api-stub</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata.data.lake</groupId>
+ <artifactId>data-orchestrator-clients-core</artifactId>
+ <version>0.01-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index ddb7438..7b47784 100644
--- a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -22,17 +22,22 @@
import com.google.protobuf.util.JsonFormat;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationInvokeRequest;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationInvokeResponse;
import org.apache.airavata.datalake.drms.AuthenticatedUser;
import org.apache.airavata.datalake.drms.resource.GenericResource;
import org.apache.airavata.datalake.drms.storage.*;
+import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
import org.apache.airavata.drms.api.persistance.mapper.ResourceMapper;
import org.apache.airavata.drms.api.persistance.mapper.StorageMapper;
import org.apache.airavata.drms.api.persistance.model.Resource;
import org.apache.airavata.drms.api.persistance.model.ResourceProperty;
import org.apache.airavata.drms.api.persistance.model.TransferMapping;
+import org.apache.airavata.drms.api.persistance.model.UnverifiedResource;
import org.apache.airavata.drms.api.persistance.repository.ResourcePropertyRepository;
import org.apache.airavata.drms.api.persistance.repository.ResourceRepository;
import org.apache.airavata.drms.api.persistance.repository.TransferMappingRepository;
+import org.apache.airavata.drms.api.persistance.repository.UnverifiedResourceRepository;
import org.apache.airavata.drms.api.utils.CustosUtils;
import org.apache.airavata.drms.core.constants.SharingConstants;
import org.apache.airavata.drms.core.constants.StorageConstants;
@@ -42,12 +47,15 @@
import org.apache.custos.sharing.core.SearchCondition;
import org.apache.custos.sharing.core.SearchCriteria;
import org.apache.custos.sharing.management.client.SharingManagementClient;
-import org.apache.custos.sharing.service.*;
+import org.apache.custos.sharing.service.Entities;
+import org.apache.custos.sharing.service.SearchRequest;
import org.json.JSONObject;
import org.lognet.springboot.grpc.GRpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.PageRequest;
import java.io.IOException;
import java.util.*;
@@ -71,6 +79,15 @@
@Autowired
private TransferMappingRepository transferMappingRepository;
+ @Autowired
+ private UnverifiedResourceRepository unverifiedResourceRepository;
+
+ @org.springframework.beans.factory.annotation.Value("${orch.host}")
+ private String orchHost;
+
+ @org.springframework.beans.factory.annotation.Value("${orch.port}")
+ private int orchPort;
+
@Override
public void fetchResource(ResourceFetchRequest request, StreamObserver<ResourceFetchResponse> responseObserver) {
@@ -152,7 +169,7 @@
callUser.getUsername());
if (exEntity.isPresent()) {
- Resource resource = ResourceMapper.map(request.getResource(),null, exEntity.get(), callUser);
+ Resource resource = ResourceMapper.map(request.getResource(), null, exEntity.get(), callUser);
resource.setResourceType(type);
resource.setParentResourceId(parentId);
resourceRepository.save(resource);
@@ -256,37 +273,37 @@
String entityId = request.getResource().getResourceId();
String name = request.getResource().getResourceName();
- Optional<Resource> exResource = resourceRepository.findById(entityId);
- if (exResource.isPresent()) {
- List<ResourceProperty> resourceProperties = resourcePropertyRepository.
- findByPropertyKeyAndResourceId("owner",exResource.get().getId());
- if (parentId == null|| parentId.isEmpty())
- parentId = exResource.get().getParentResourceId();
- if (!resourceProperties.isEmpty()) {
- Optional<Entity> exEntity = CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
- parentId, type, entityId,
- request.getResource().getResourceName(), request.getResource().getResourceName(),
- resourceProperties.get(0).getPropertyValue());
+ Optional<Resource> exResource = resourceRepository.findById(entityId);
+ if (exResource.isPresent()) {
+ List<ResourceProperty> resourceProperties = resourcePropertyRepository.
+ findByPropertyKeyAndResourceId("owner", exResource.get().getId());
+ if (parentId == null || parentId.isEmpty())
+ parentId = exResource.get().getParentResourceId();
+ if (!resourceProperties.isEmpty()) {
+ Optional<Entity> exEntity = CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
+ parentId, type, entityId,
+ request.getResource().getResourceName(), request.getResource().getResourceName(),
+ resourceProperties.get(0).getPropertyValue());
- if (exEntity.isPresent()) {
- Resource resource = ResourceMapper.map(request.getResource(), exResource.get(),exEntity.get(), callUser);
- resource.setResourceType(type);
- resource.setParentResourceId(parentId);
+ if (exEntity.isPresent()) {
+ Resource resource = ResourceMapper.map(request.getResource(), exResource.get(), exEntity.get(), callUser);
+ resource.setResourceType(type);
+ resource.setParentResourceId(parentId);
- resourceRepository.save(resource);
+ resourceRepository.save(resource);
- GenericResource genericResource = ResourceMapper.map(resource, exEntity.get());
+ GenericResource genericResource = ResourceMapper.map(resource, exEntity.get());
- ResourceUpdateResponse response = ResourceUpdateResponse
- .newBuilder()
- .setResource(genericResource)
- .build();
- responseObserver.onNext(response);
- responseObserver.onCompleted();
- return;
- }
- }
- }
+ ResourceUpdateResponse response = ResourceUpdateResponse
+ .newBuilder()
+ .setResource(genericResource)
+ .build();
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ return;
+ }
+ }
+ }
//TODO: Error
@@ -307,6 +324,8 @@
request, StreamObserver<ResourceSearchResponse> responseObserver) {
AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
+ invokeUnVerifiedResourceRegistrationWorkflow(callUser.getUsername());
+
List<ResourceSearchQuery> resourceSearchQueries = request.getQueriesList();
SearchRequest.Builder searchRequestBuilder = SearchRequest.newBuilder();
@@ -341,7 +360,7 @@
Optional<TransferMapping> transferMappingOptional = transferMappingRepository.
findTransferMappingByScope(TransferScope.GLOBAL.name());
- if (transferMappingOptional.isPresent() && searchMap.isEmpty() && !type.equalsIgnoreCase("COLLECTION_GROUP")) {
+ if (transferMappingOptional.isPresent() && searchMap.isEmpty() && !type.equalsIgnoreCase("COLLECTION_GROUP")) {
TransferMapping transferMapping = transferMappingOptional.get();
String sourceId = transferMapping.getSource().getId();
@@ -412,34 +431,34 @@
AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
GenericResource resource = request.getParentResource();
List<GenericResource> childResources = request.getChildResourcesList();
- childResources.forEach(childResource-> {
+ childResources.forEach(childResource -> {
- List<ResourceProperty> resourceProperties = resourcePropertyRepository.
- findByPropertyKeyAndResourceId("resourceName",childResource.getResourceId());
- Optional<Resource> exRes = resourceRepository.findById(childResource.getResourceId());
- try{
- if(!resourceProperties.isEmpty() && exRes.isPresent()) {
- CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
- resource.getResourceId(), childResource.getType(), childResource.getResourceId(),
- resourceProperties.get(0).getPropertyValue(), resourceProperties.get(0).getPropertyValue(),
- callUser.getUsername());
- Resource chResource = exRes.get();
- chResource.setParentResourceId(resource.getResourceId());
- resourceRepository.save(chResource);
+ List<ResourceProperty> resourceProperties = resourcePropertyRepository.
+ findByPropertyKeyAndResourceId("resourceName", childResource.getResourceId());
+ Optional<Resource> exRes = resourceRepository.findById(childResource.getResourceId());
+ try {
+ if (!resourceProperties.isEmpty() && exRes.isPresent()) {
+ CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
+ resource.getResourceId(), childResource.getType(), childResource.getResourceId(),
+ resourceProperties.get(0).getPropertyValue(), resourceProperties.get(0).getPropertyValue(),
+ callUser.getUsername());
+ Resource chResource = exRes.get();
+ chResource.setParentResourceId(resource.getResourceId());
+ resourceRepository.save(chResource);
- }
- } catch (IOException e) {
- String msg = " Error occurred while adding child memberships " + e.getMessage();
- logger.error(" Error occurred while adding child memberships: Messages {} ", e.getMessage(), e);
}
- });
- OperationStatusResponse operationStatusResponse = OperationStatusResponse
- .newBuilder().
- setStatus(true)
- .build();
- responseObserver.onNext(operationStatusResponse);
- responseObserver.onCompleted();
+ } catch (IOException e) {
+ String msg = " Error occurred while adding child memberships " + e.getMessage();
+ logger.error(" Error occurred while adding child memberships: Messages {} ", e.getMessage(), e);
+ }
+ });
+ OperationStatusResponse operationStatusResponse = OperationStatusResponse
+ .newBuilder().
+ setStatus(true)
+ .build();
+ responseObserver.onNext(operationStatusResponse);
+ responseObserver.onCompleted();
} catch (Exception e) {
String msg = " Error occurred while adding child memberships " + e.getMessage();
@@ -457,16 +476,16 @@
AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
GenericResource resource = request.getParentResource();
List<GenericResource> childResources = request.getChildResourcesList();
- childResources.forEach(childResource-> {
- List<ResourceProperty> resourceProperties = resourcePropertyRepository.findByPropertyKeyAndResourceId("resourceName",childResource.getResourceId());
- Optional<Resource> exRes = resourceRepository.findById(childResource.getResourceId());
+ childResources.forEach(childResource -> {
+ List<ResourceProperty> resourceProperties = resourcePropertyRepository.findByPropertyKeyAndResourceId("resourceName", childResource.getResourceId());
+ Optional<Resource> exRes = resourceRepository.findById(childResource.getResourceId());
try {
- if(!resourceProperties.isEmpty() && exRes.isPresent()) {
+ if (!resourceProperties.isEmpty() && exRes.isPresent()) {
CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
"", childResource.getType(), childResource.getResourceId(),
resourceProperties.get(0).getPropertyValue(), resourceProperties.get(0).getPropertyValue(),
callUser.getUsername());
- Resource chResource = exRes.get();
+ Resource chResource = exRes.get();
chResource.setParentResourceId(null);
resourceRepository.save(chResource);
}
@@ -643,6 +662,92 @@
}
+
+ @Override
+ public void fetchUnverifiedResources(ResourceSearchRequest request, StreamObserver<ResourceSearchResponse> responseObserver) {
+ try {
+ PageRequest pageRequest = PageRequest.of(request.getOffset(), request.getLimit());
+ Page<UnverifiedResource> resources = unverifiedResourceRepository.findAll(pageRequest);
+
+ ResourceSearchResponse.Builder response = ResourceSearchResponse.newBuilder();
+
+ Map<String, String> props = new HashMap<>();
+
+
+ resources.forEach(resource -> {
+ Map<String, String> prop = new HashMap<>();
+ prop.put("ERROR_CODE", resource.getErrorCode());
+ prop.put("ERROR_DISCRIPTION", resource.getErrorDiscription());
+ response.addResources(GenericResource.newBuilder()
+ .setResourceId(resource.getId())
+ .setResourcePath(resource.getPath())
+ .setType(resource.getType())
+ .putAllProperties(prop).build());
+ });
+
+ responseObserver.onNext(response.build());
+ responseObserver.onCompleted();
+ } catch (Exception ex) {
+ String msg = " Error occurred while fetching unverified resources " + ex.getMessage();
+ logger.error(" Error occurred while fetching unverified resources {} ", ex.getMessage(), ex);
+ responseObserver.onError(Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+
+ }
+
+ @Override
+ public void createUnverifiedResource(ResourceCreateRequest request, StreamObserver<ResourceCreateResponse> responseObserver) {
+ try {
+ AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
+ String type = request.getResource().getType();
+ String entityId = request.getResource().getResourceId();
+ String path = request.getResource().getResourcePath();
+ String tenantId = request.getAuthToken().getAuthenticatedUser().getTenantId();
+ String errorCode = request.getResource().getPropertiesMap().get("ERROR_CODE");
+ String errorDescription = request.getResource().getPropertiesMap().get("ERROR_DESCRIPTION");
+
+
+ UnverifiedResource unverifiedResource = new UnverifiedResource();
+ unverifiedResource.setId(entityId);
+ unverifiedResource.setPath(path);
+ unverifiedResource.setErrorCode(errorCode);
+ unverifiedResource.setErrorDiscription(errorDescription);
+ unverifiedResource.setTenantId(tenantId);
+ unverifiedResource.setType(type);
+
+ if (!callUser.getUsername().isEmpty()) {
+ unverifiedResource.setUnverifiedAssociatedOwner(callUser.getUsername());
+ }
+
+ unverifiedResourceRepository.save(unverifiedResource);
+
+ ResourceCreateResponse response = ResourceCreateResponse
+ .newBuilder()
+ .setResource(request.getResource())
+ .build();
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ return;
+
+ } catch (Exception ex) {
+ logger.error("Error occurred while creating unverified resource {}", request.getResource().getResourceId(), ex);
+ String msg = "Error occurred while creating unverified resource" + ex.getMessage();
+ responseObserver.onError(Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteUnverifiedResource(ResourceUpdateRequest request, StreamObserver<ResourceUpdateResponse> responseObserver) {
+ try {
+ unverifiedResourceRepository.deleteById(request.getResourceId());
+ } catch (Exception ex) {
+
+ logger.error("Error occurred while deleting unverified resource {}", request.getResource().getResourceId(), ex);
+ responseObserver.onError(Status.INTERNAL.
+ withDescription("Error occurred while deleting unverified resource {}" + request.getResource().getResourceId()).asRuntimeException());
+ }
+ }
+
private Set<ResourceProperty> mergeProperties(Resource resource, Map<String, Object> values) {
Set<ResourceProperty> exProperties = resource.getResourceProperty();
@@ -678,4 +783,30 @@
return newProperties;
}
+
+ private void invokeUnVerifiedResourceRegistrationWorkflow(String username) {
+ List<UnverifiedResource> unverifiedResources = unverifiedResourceRepository
+ .getUnverifiedResourceByUnverifiedAssociatedOwnerAndErrorCode(username, "ERR_0002");
+
+ NotificationClient notificationClient = new NotificationClient(
+ orchHost, orchPort);
+
+ if (!unverifiedResources.isEmpty()) {
+
+ for (UnverifiedResource unverifiedResource : unverifiedResources) {
+
+ NotificationInvokeResponse response = notificationClient.get()
+ .invokeNotification(NotificationInvokeRequest
+ .newBuilder()
+ .setNotificationId(unverifiedResource.getId())
+ .build());
+ if (response.getStatus()) {
+ unverifiedResourceRepository.deleteById(unverifiedResource.getId());
+ }
+
+ }
+ }
+
+ }
+
}
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/interceptors/Authenticator.java b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/interceptors/Authenticator.java
index aa3007f..beb9564 100644
--- a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/interceptors/Authenticator.java
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/interceptors/Authenticator.java
@@ -120,7 +120,7 @@
String tenantId = drmsServiceAuthToken.getAuthenticatedUser().getTenantId();
Struct struct = identityManagementClient
.getAgentToken(tenantId, agentClientId, agentClientSec, "client_credentials", "");
- if (struct.getFieldsMap().get("access_token").isInitialized()) {
+ if (struct.getFieldsMap().get("access_token").isInitialized() && ! drmsServiceAuthToken.getUserUnverified()) {
UserRepresentation user = userManagementClient.getUser(username, tenantId);
return Optional.ofNullable(AuthenticatedUser.newBuilder()
.setUsername(user.getUsername())
@@ -129,6 +129,14 @@
.setEmailAddress(user.getEmail())
.setTenantId(tenantId)
.build());
+ } else {
+ AuthenticatedUser user = AuthenticatedUser.newBuilder()
+ .setTenantId(tenantId)
+ .build();
+ if (username == null || username.isEmpty()) {
+ user = user.toBuilder().setUsername(username).build();
+ }
+ return Optional.ofNullable(user);
}
}
} else {
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/model/UnverifiedResource.java b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/model/UnverifiedResource.java
new file mode 100644
index 0000000..b7b44ae
--- /dev/null
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/model/UnverifiedResource.java
@@ -0,0 +1,91 @@
+package org.apache.airavata.drms.api.persistance.model;
+
+
+import org.springframework.data.jpa.domain.support.AuditingEntityListener;
+
+import javax.persistence.*;
+
+@Entity
+@Table(name = "UNVERIFIED_RESOURCE")
+@EntityListeners(AuditingEntityListener.class)
+public class UnverifiedResource {
+
+ @Id
+ @Column(name="ID")
+ private String id;
+
+ @Column(name="TENANT_ID",nullable = false)
+ private String tenantId;
+
+ @Column(name="RESOURCE_PATH")
+ private String path;
+
+ @Column(name="ERROR_CODE")
+ private String errorCode;
+
+ @Column(name="ERROR_DISCRIPTION")
+ private String errorDiscription;
+
+ @Column(name="TYPE")
+ private String type;
+
+ @Column(name="UNVERIFIED_OWNER")
+ private String unverifiedAssociatedOwner;
+
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getTenantId() {
+ return tenantId;
+ }
+
+ public void setTenantId(String tenantId) {
+ this.tenantId = tenantId;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getErrorCode() {
+ return errorCode;
+ }
+
+ public void setErrorCode(String errorCode) {
+ this.errorCode = errorCode;
+ }
+
+ public String getErrorDiscription() {
+ return errorDiscription;
+ }
+
+ public void setErrorDiscription(String errorDiscription) {
+ this.errorDiscription = errorDiscription;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getUnverifiedAssociatedOwner() {
+ return unverifiedAssociatedOwner;
+ }
+
+ public void setUnverifiedAssociatedOwner(String unverifiedAssociatedOwner) {
+ this.unverifiedAssociatedOwner = unverifiedAssociatedOwner;
+ }
+}
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/repository/UnverifiedResourceRepository.java b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/repository/UnverifiedResourceRepository.java
new file mode 100644
index 0000000..2ad5877
--- /dev/null
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/repository/UnverifiedResourceRepository.java
@@ -0,0 +1,15 @@
+package org.apache.airavata.drms.api.persistance.repository;
+
+import org.apache.airavata.drms.api.persistance.model.Resource;
+import org.apache.airavata.drms.api.persistance.model.UnverifiedResource;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Query;
+
+import java.util.List;
+
+public interface UnverifiedResourceRepository extends JpaRepository<UnverifiedResource, String> {
+
+
+ public List<UnverifiedResource> getUnverifiedResourceByUnverifiedAssociatedOwnerAndErrorCode(String username,String errorCode);
+
+}
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/Common.proto b/data-resource-management-service/drms-stubs/src/main/proto/Common.proto
index 604c1fd..6bcfeea 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/Common.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/Common.proto
@@ -35,6 +35,7 @@
string access_token = 1;
AuthenticatedUser authenticated_user = 2;
AuthCredentialType auth_credential_type =3;
+ bool user_unverified =4;
}
enum AuthCredentialType {
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
index c696cb0..b6eb64d 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
@@ -36,4 +36,5 @@
string parent_resource_path = 7;
string resource_name = 8;
string parent_id = 9;
+
}
\ No newline at end of file
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
index f74f21e..2d886bc 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
@@ -86,6 +86,8 @@
repeated ResourceSearchQuery queries = 2;
int32 depth = 4;
string type = 5;
+ int32 offset=6;
+ int32 limit=7;
}
message ResourceSearchResponse {
@@ -222,5 +224,23 @@
};
}
+ rpc fetchUnverifiedResources (ResourceSearchRequest) returns (ResourceSearchResponse) {
+ option (google.api.http) = {
+ get: "/v1.0/api/drms/resources/unverified"
+ };
+ }
+
+ rpc createUnverifiedResource (ResourceCreateRequest) returns (ResourceCreateResponse) {
+ option (google.api.http) = {
+ post: "/v1.0/api/drms/resource/unverified"
+ };
+ }
+
+ rpc deleteUnverifiedResource (ResourceUpdateRequest) returns (ResourceUpdateResponse) {
+ option (google.api.http) = {
+ delete: "/v1.0/api/drms/resource/unverified"
+ };
+ }
+
}
\ No newline at end of file