Merge pull request #208 from isururanawaka/service_monitoring

recording unverified resources, users, and invoke workflows when user…
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Constants.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Constants.java
new file mode 100644
index 0000000..c1f3d5d
--- /dev/null
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Constants.java
@@ -0,0 +1,8 @@
+package org.apache.airavata.datalake.orchestrator;
+
+public class Constants {
+
+    public static final String ERROR_CODE_INVALID_TYPE="ERR_0001";
+    public static final String ERROR_CODE_INVALID_USERNAME="ERR_0002";
+    public static final String ERROR_CODE_INVALID_PATH="ERR_0003";
+}
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
index 7fcefc3..dde3708 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/DataOrchestratorAPIRunner.java
@@ -61,24 +61,16 @@
         LOGGER.info("Starting Data orchestrator API Server ...");
 
         LOGGER.info("Loading configuration from file {} ...", configPath);
-        Configuration configuration = this.loadConfig(configPath);
+        Configuration configuration = Utils.loadConfig(configPath);
 
         LOGGER.info("Registering Orchestration even handler " + OrchestratorEventHandler.class.getName() + " ...");
         orchestratorEventHandler.init(configuration);
+
         LOGGER.info("Data orchestrator start accepting  events ....");
         orchestratorEventHandler.startProcessing();
     }
 
 
-    private Configuration loadConfig(String filePath) {
-        LOGGER.info("File path " + filePath);
-        try (InputStream in = new FileInputStream(filePath)) {
-            Yaml yaml = new Yaml();
-            return yaml.loadAs(in, Configuration.class);
-        } catch (Exception exception) {
-            LOGGER.error("Error loading config file", exception);
-        }
-        return null;
-    }
+
 
 }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
index 420c042..79fbb45 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Utils.java
@@ -1,5 +1,11 @@
 package org.apache.airavata.datalake.orchestrator;
 
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
@@ -26,4 +32,11 @@
 
         return hexString.toString();
     }
+
+    public static Configuration loadConfig(String filePath) throws IOException {
+        try (InputStream in = new FileInputStream(filePath)) {
+            Yaml yaml = new Yaml();
+            return yaml.loadAs(in, Configuration.class);
+        }
+    }
 }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index f943449..1f10703 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -194,6 +194,60 @@
         }
     }
 
+    public Optional<GenericResource> createUnverifiedResource(String authToken,
+                                                    String tenantId,
+                                                    String resourceId,
+                                                    String resourcePath,
+                                                    String type,
+                                                    String errorCode,
+                                                              String errorDiscription,
+                                                              String unverifiedUser) throws Exception {
+
+        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+                .setAccessToken(authToken)
+                .setUserUnverified(true)
+                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                        .setTenantId(tenantId)
+                        .build())
+                .build();
+        if (unverifiedUser != null){
+           serviceAuthToken = serviceAuthToken
+                    .toBuilder()
+                    .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                    .setTenantId(tenantId)
+                    .setUsername(unverifiedUser)
+                    .build()).build();
+        }
+
+        GenericResource genericResource = GenericResource
+                .newBuilder()
+                .setResourceId(resourceId)
+                .setResourcePath(resourcePath)
+                .setType(type)
+                .putProperties("ERROR_CODE", errorCode)
+                .putProperties("ERROR_DISCRIPTION", errorDiscription)
+                 .build();
+        ResourceCreateRequest resourceCreateRequest = ResourceCreateRequest
+                .newBuilder()
+                .setAuthToken(serviceAuthToken)
+                .setResource(genericResource)
+                .build();
+
+        try {
+            ResourceCreateResponse resourceCreateResponse = resourceServiceBlockingStub.createUnverifiedResource(resourceCreateRequest);
+            return Optional.ofNullable(resourceCreateResponse.getResource());
+        } catch (Exception ex) {
+            LOGGER.error("Error occurred while creating unverified resource {} in DRMS", resourcePath, ex);
+            return Optional.empty();
+        }
+    }
+
+
+
+
+
+
     public void addResourceMetadata(String authToken,
                                     String tenantId,
                                     String resourceId,
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index 9b056a5..074d216 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -20,6 +20,7 @@
 import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.Notification;
 import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationRegisterRequest;
 import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationEntity;
 import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
 import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
 import org.dozer.DozerBeanMapper;
@@ -94,6 +95,23 @@
         }));
     }
 
+
+
+    public void invokeMessageFlowForNotification(Notification notification) throws Exception {
+        try {
+            if (!eventCache.contains(notification.getResourcePath() + ":" + notification.getHostName())) {
+                eventCache.add(notification.getResourcePath() + ":" + notification.getHostName());
+                this.executorService.submit(new OrchestratorEventProcessor(
+                        configuration, notification, eventCache, notificationClient));
+            }
+        }catch (Exception e) {
+            LOGGER.error("Failed to submit data orchestrator event to process on path {}",
+                    notification.getResourcePath(), e);
+        }
+    }
+
+
+
     public Configuration getConfiguration() {
         return configuration;
     }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index f6a4e49..e8e0546 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -24,6 +24,7 @@
 import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
 import org.apache.airavata.datalake.drms.storage.TransferMapping;
 import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Constants;
 import org.apache.airavata.datalake.orchestrator.Utils;
 import org.apache.airavata.datalake.orchestrator.connectors.CustosConnector;
 import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
@@ -160,17 +161,16 @@
         }
     }
 
-    private String verifyUser(String userName) throws Exception {
+    private Optional<String> verifyUser(String userName) throws Exception {
         if (custosConnector.findUserByUserName(userName).isEmpty()) {
             Optional<UserRepresentation> userByEmail = custosConnector.findUserByEmail(userName);
             if (userByEmail.isPresent()) {
-                return userByEmail.get().getUsername();
+                return Optional.of(userByEmail.get().getUsername());
             } else {
-                logger.error("No user {} by email or user name", userName);
-                throw new Exception("Could not find the user " + userName);
+                return Optional.empty();
             }
         } else {
-            return userName;
+            return Optional.of(userName);
         }
     }
 
@@ -198,6 +198,11 @@
                         notification.getResourcePath(),
                         notification.getResourceType());
                 logger.error("Resource should be a Folder type");
+                this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
+                        notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
+                        Constants.ERROR_CODE_INVALID_TYPE, " Invalid  type :" + notification.getResourceType(), null);
+                throw new Exception("Resource should be a Folder type");
+
             }
 
             String removeBasePath = notification.getResourcePath().substring(notification.getBasePath().length());
@@ -205,11 +210,31 @@
 
             if (splitted.length < 2) {
                 logger.error("Invalid path. Need at least two folder levels from base. {}", removeBasePath);
+                this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
+                        notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
+                        Constants.ERROR_CODE_INVALID_PATH, " Invalid  path ", null);
                 throw new Exception("Invalid path. Need at least two folder levels from base");
             }
+            String adminUser = null;
+            String owner = null;
 
-            String adminUser = verifyUser(splitted[0]);
-            String owner = verifyUser(splitted[1].split("_")[0]);
+            Optional<String> adminUserOp = verifyUser(splitted[0]);
+            Optional<String> ownerOp = verifyUser(splitted[1].split("_")[0]);
+            if (adminUserOp.isEmpty()) {
+                this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
+                        notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
+                        Constants.ERROR_CODE_INVALID_USERNAME, " User not verified ", splitted[0]);
+                logger.error("Invalid user. User should be verified users {} . {}", splitted[0], removeBasePath);
+                throw new Exception("Invalid user. User " + splitted[0] + " should be registered users");
+            }
+            if (ownerOp.isEmpty()) {
+                this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
+                        notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
+                        Constants.ERROR_CODE_INVALID_USERNAME, " User not verified ", splitted[1]);
+                logger.error("Invalid user. User should be verified users {} . {}", splitted[1], removeBasePath);
+                throw new Exception("Invalid user. User " + splitted[1] + " should be registered users");
+            }
+
 
             Map<String, String> ownerRules = new HashMap<>();
             ownerRules.put(adminUser, "VIEWER");
@@ -393,4 +418,6 @@
                     directoryMetadata.getResourcePath(), sourceStorageId, sourceStorageId, System.currentTimeMillis() - start);
         }
     }
+
+
 }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
index 61fbbd1..d6269c7 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/grpc/NotificationApiHandler.java
@@ -17,29 +17,48 @@
 
 package org.apache.airavata.datalake.orchestrator.handlers.grpc;
 
+import com.google.protobuf.Empty;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.*;
+import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Utils;
+import org.apache.airavata.datalake.orchestrator.handlers.async.OrchestratorEventHandler;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationEntity;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.entity.notification.NotificationStatusEntity;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.NotificationEntityRepository;
 import org.apache.airavata.datalake.orchestrator.registry.persistance.repository.NotificationStatusEntityRepository;
 import org.dozer.DozerBeanMapper;
 import org.lognet.springboot.grpc.GRpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Stream;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 @GRpcService
 public class NotificationApiHandler extends NotificationServiceGrpc.NotificationServiceImplBase {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(OrchestratorEventHandler.class);
+
     @Autowired
     private NotificationEntityRepository notificationRepository;
 
     @Autowired
     private NotificationStatusEntityRepository notificationStatusRepository;
 
+
+    @Autowired
+    private OrchestratorEventHandler orchestratorEventHandler;
+
+    @org.springframework.beans.factory.annotation.Value("${config.path}")
+    private String configPath;
+
+
     @Override
     public void registerNotification(NotificationRegisterRequest request, StreamObserver<NotificationRegisterResponse> responseObserver) {
         DozerBeanMapper mapper = new DozerBeanMapper();
@@ -99,4 +118,40 @@
         responseObserver.onNext(responseBuilder.build());
         responseObserver.onCompleted();
     }
+
+
+    @Override
+    public void invokeNotification(NotificationInvokeRequest request, StreamObserver<NotificationInvokeResponse> responseObserver) {
+        try {
+            Optional<NotificationEntity> notificationOP = notificationRepository.findById(request.getNotificationId());
+
+            if (notificationOP.isPresent()) {
+                NotificationEntity notificationEntity = notificationOP.get();
+
+
+                Notification notification = Notification
+                        .newBuilder()
+                        .setNotificationId(UUID.randomUUID().toString())
+                        .setBasePath(notificationEntity.getBasePath())
+                        .setResourcePath(notificationEntity.getResourcePath())
+                        .setAuthToken(notificationEntity.getAuthToken())
+                        .setEventType(Notification.NotificationType.valueOf(notificationEntity.getResourceType()))
+                        .setHostName(notificationEntity.getHostName())
+                        .setOccuredTime(notificationEntity.getOccuredTime())
+                        .setTenantId(notificationEntity.getTenantId())
+                        .setResourceType(notificationEntity.getResourceType()).build();
+
+                Configuration configuration = Utils.loadConfig(configPath);
+                orchestratorEventHandler.init(configuration);
+                orchestratorEventHandler.invokeMessageFlowForNotification(notification);
+                responseObserver.onNext(NotificationInvokeResponse.newBuilder().setStatus(true).build());
+                responseObserver.onCompleted();
+            }
+        } catch (Exception ex) {
+            String msg = "Notification invocation failed for id " + request.getNotificationId();
+            LOGGER.error(msg, ex);
+            responseObserver.onError(Status.INTERNAL.withDescription(msg).asRuntimeException());
+        }
+
+    }
 }
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/resources/application.properties b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/resources/application.properties
index 88dd260..606cc01 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/resources/application.properties
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/resources/application.properties
@@ -17,4 +17,4 @@
 mft.host=localhost
 mft.port=6565
 drms.host=localhost
-drms.port=7070
\ No newline at end of file
+drms.port=7070
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
index 2a019fb..bef56ed 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/notification.proto
@@ -73,6 +73,14 @@
     Notification notification = 1;
 }
 
+message NotificationInvokeRequest {
+    string notificationId = 1;
+}
+
+message NotificationInvokeResponse {
+    bool status = 1;
+}
+
 message NotificationRegisterResponse {
 }
 
@@ -109,6 +117,12 @@
     };
     }
 
+    rpc invokeNotification (NotificationInvokeRequest) returns (NotificationInvokeResponse) {
+        option (google.api.http) = {
+      post: "/v1.0/api/dataorch/notification/invoke"
+    };
+    }
+
     rpc listNotifications (NotificationListRequest) returns (NotificationListResponse) {
         option (google.api.http) = {
       get: "/v1.0/api/dataorch/notifications"
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/pom.xml b/data-resource-management-service/drms-rdbms-impl/drms-server/pom.xml
index c8b3080..66c2ab3 100644
--- a/data-resource-management-service/drms-rdbms-impl/drms-server/pom.xml
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/pom.xml
@@ -153,6 +153,17 @@
             <artifactId>drms-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata.data.lake</groupId>
+            <artifactId>data-orchestrator-api-stub</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata.data.lake</groupId>
+            <artifactId>data-orchestrator-clients-core</artifactId>
+            <version>0.01-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
 
     </dependencies>
 
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index ddb7438..7b47784 100644
--- a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -22,17 +22,22 @@
 import com.google.protobuf.util.JsonFormat;
 import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationInvokeRequest;
+import org.apache.airavata.datalake.data.orchestrator.api.stub.notification.NotificationInvokeResponse;
 import org.apache.airavata.datalake.drms.AuthenticatedUser;
 import org.apache.airavata.datalake.drms.resource.GenericResource;
 import org.apache.airavata.datalake.drms.storage.*;
+import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
 import org.apache.airavata.drms.api.persistance.mapper.ResourceMapper;
 import org.apache.airavata.drms.api.persistance.mapper.StorageMapper;
 import org.apache.airavata.drms.api.persistance.model.Resource;
 import org.apache.airavata.drms.api.persistance.model.ResourceProperty;
 import org.apache.airavata.drms.api.persistance.model.TransferMapping;
+import org.apache.airavata.drms.api.persistance.model.UnverifiedResource;
 import org.apache.airavata.drms.api.persistance.repository.ResourcePropertyRepository;
 import org.apache.airavata.drms.api.persistance.repository.ResourceRepository;
 import org.apache.airavata.drms.api.persistance.repository.TransferMappingRepository;
+import org.apache.airavata.drms.api.persistance.repository.UnverifiedResourceRepository;
 import org.apache.airavata.drms.api.utils.CustosUtils;
 import org.apache.airavata.drms.core.constants.SharingConstants;
 import org.apache.airavata.drms.core.constants.StorageConstants;
@@ -42,12 +47,15 @@
 import org.apache.custos.sharing.core.SearchCondition;
 import org.apache.custos.sharing.core.SearchCriteria;
 import org.apache.custos.sharing.management.client.SharingManagementClient;
-import org.apache.custos.sharing.service.*;
+import org.apache.custos.sharing.service.Entities;
+import org.apache.custos.sharing.service.SearchRequest;
 import org.json.JSONObject;
 import org.lognet.springboot.grpc.GRpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.PageRequest;
 
 import java.io.IOException;
 import java.util.*;
@@ -71,6 +79,15 @@
     @Autowired
     private TransferMappingRepository transferMappingRepository;
 
+    @Autowired
+    private UnverifiedResourceRepository unverifiedResourceRepository;
+
+    @org.springframework.beans.factory.annotation.Value("${orch.host}")
+    private String orchHost;
+
+    @org.springframework.beans.factory.annotation.Value("${orch.port}")
+    private int orchPort;
+
 
     @Override
     public void fetchResource(ResourceFetchRequest request, StreamObserver<ResourceFetchResponse> responseObserver) {
@@ -152,7 +169,7 @@
                     callUser.getUsername());
 
             if (exEntity.isPresent()) {
-                Resource resource = ResourceMapper.map(request.getResource(),null, exEntity.get(), callUser);
+                Resource resource = ResourceMapper.map(request.getResource(), null, exEntity.get(), callUser);
                 resource.setResourceType(type);
                 resource.setParentResourceId(parentId);
                 resourceRepository.save(resource);
@@ -256,37 +273,37 @@
             String entityId = request.getResource().getResourceId();
             String name = request.getResource().getResourceName();
 
-           Optional<Resource> exResource =  resourceRepository.findById(entityId);
-           if (exResource.isPresent()) {
-               List<ResourceProperty> resourceProperties = resourcePropertyRepository.
-                       findByPropertyKeyAndResourceId("owner",exResource.get().getId());
-               if (parentId == null|| parentId.isEmpty())
-                   parentId = exResource.get().getParentResourceId();
-               if (!resourceProperties.isEmpty()) {
-                   Optional<Entity> exEntity = CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
-                           parentId, type, entityId,
-                           request.getResource().getResourceName(), request.getResource().getResourceName(),
-                           resourceProperties.get(0).getPropertyValue());
+            Optional<Resource> exResource = resourceRepository.findById(entityId);
+            if (exResource.isPresent()) {
+                List<ResourceProperty> resourceProperties = resourcePropertyRepository.
+                        findByPropertyKeyAndResourceId("owner", exResource.get().getId());
+                if (parentId == null || parentId.isEmpty())
+                    parentId = exResource.get().getParentResourceId();
+                if (!resourceProperties.isEmpty()) {
+                    Optional<Entity> exEntity = CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
+                            parentId, type, entityId,
+                            request.getResource().getResourceName(), request.getResource().getResourceName(),
+                            resourceProperties.get(0).getPropertyValue());
 
-                   if (exEntity.isPresent()) {
-                       Resource resource = ResourceMapper.map(request.getResource(), exResource.get(),exEntity.get(), callUser);
-                       resource.setResourceType(type);
-                       resource.setParentResourceId(parentId);
+                    if (exEntity.isPresent()) {
+                        Resource resource = ResourceMapper.map(request.getResource(), exResource.get(), exEntity.get(), callUser);
+                        resource.setResourceType(type);
+                        resource.setParentResourceId(parentId);
 
-                       resourceRepository.save(resource);
+                        resourceRepository.save(resource);
 
-                       GenericResource genericResource = ResourceMapper.map(resource, exEntity.get());
+                        GenericResource genericResource = ResourceMapper.map(resource, exEntity.get());
 
-                       ResourceUpdateResponse response = ResourceUpdateResponse
-                               .newBuilder()
-                               .setResource(genericResource)
-                               .build();
-                       responseObserver.onNext(response);
-                       responseObserver.onCompleted();
-                       return;
-                   }
-               }
-           }
+                        ResourceUpdateResponse response = ResourceUpdateResponse
+                                .newBuilder()
+                                .setResource(genericResource)
+                                .build();
+                        responseObserver.onNext(response);
+                        responseObserver.onCompleted();
+                        return;
+                    }
+                }
+            }
             //TODO: Error
 
 
@@ -307,6 +324,8 @@
                                        request, StreamObserver<ResourceSearchResponse> responseObserver) {
         AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
 
+        invokeUnVerifiedResourceRegistrationWorkflow(callUser.getUsername());
+
         List<ResourceSearchQuery> resourceSearchQueries = request.getQueriesList();
 
         SearchRequest.Builder searchRequestBuilder = SearchRequest.newBuilder();
@@ -341,7 +360,7 @@
         Optional<TransferMapping> transferMappingOptional = transferMappingRepository.
                 findTransferMappingByScope(TransferScope.GLOBAL.name());
 
-      if (transferMappingOptional.isPresent() && searchMap.isEmpty() && !type.equalsIgnoreCase("COLLECTION_GROUP")) {
+        if (transferMappingOptional.isPresent() && searchMap.isEmpty() && !type.equalsIgnoreCase("COLLECTION_GROUP")) {
             TransferMapping transferMapping = transferMappingOptional.get();
             String sourceId = transferMapping.getSource().getId();
 
@@ -412,34 +431,34 @@
             AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
             GenericResource resource = request.getParentResource();
             List<GenericResource> childResources = request.getChildResourcesList();
-                childResources.forEach(childResource-> {
+            childResources.forEach(childResource -> {
 
-                      List<ResourceProperty> resourceProperties =  resourcePropertyRepository.
-                              findByPropertyKeyAndResourceId("resourceName",childResource.getResourceId());
-                    Optional<Resource>  exRes = resourceRepository.findById(childResource.getResourceId());
-                   try{
-                      if(!resourceProperties.isEmpty() && exRes.isPresent()) {
-                          CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
-                                  resource.getResourceId(), childResource.getType(), childResource.getResourceId(),
-                                  resourceProperties.get(0).getPropertyValue(), resourceProperties.get(0).getPropertyValue(),
-                                  callUser.getUsername());
-                       Resource chResource =  exRes.get();
-                       chResource.setParentResourceId(resource.getResourceId());
-                       resourceRepository.save(chResource);
+                List<ResourceProperty> resourceProperties = resourcePropertyRepository.
+                        findByPropertyKeyAndResourceId("resourceName", childResource.getResourceId());
+                Optional<Resource> exRes = resourceRepository.findById(childResource.getResourceId());
+                try {
+                    if (!resourceProperties.isEmpty() && exRes.isPresent()) {
+                        CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
+                                resource.getResourceId(), childResource.getType(), childResource.getResourceId(),
+                                resourceProperties.get(0).getPropertyValue(), resourceProperties.get(0).getPropertyValue(),
+                                callUser.getUsername());
+                        Resource chResource = exRes.get();
+                        chResource.setParentResourceId(resource.getResourceId());
+                        resourceRepository.save(chResource);
 
 
-                      }
-                    } catch (IOException e) {
-                        String msg = " Error occurred while adding  child memberships " + e.getMessage();
-                        logger.error(" Error occurred while adding  child memberships: Messages {} ", e.getMessage(), e);
                     }
-                });
-                OperationStatusResponse operationStatusResponse = OperationStatusResponse
-                        .newBuilder().
-                                setStatus(true)
-                        .build();
-           responseObserver.onNext(operationStatusResponse);
-           responseObserver.onCompleted();
+                } catch (IOException e) {
+                    String msg = " Error occurred while adding  child memberships " + e.getMessage();
+                    logger.error(" Error occurred while adding  child memberships: Messages {} ", e.getMessage(), e);
+                }
+            });
+            OperationStatusResponse operationStatusResponse = OperationStatusResponse
+                    .newBuilder().
+                            setStatus(true)
+                    .build();
+            responseObserver.onNext(operationStatusResponse);
+            responseObserver.onCompleted();
 
         } catch (Exception e) {
             String msg = " Error occurred while adding  child memberships " + e.getMessage();
@@ -457,16 +476,16 @@
             AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
             GenericResource resource = request.getParentResource();
             List<GenericResource> childResources = request.getChildResourcesList();
-            childResources.forEach(childResource-> {
-                List<ResourceProperty> resourceProperties =  resourcePropertyRepository.findByPropertyKeyAndResourceId("resourceName",childResource.getResourceId());
-                Optional<Resource>  exRes = resourceRepository.findById(childResource.getResourceId());
+            childResources.forEach(childResource -> {
+                List<ResourceProperty> resourceProperties = resourcePropertyRepository.findByPropertyKeyAndResourceId("resourceName", childResource.getResourceId());
+                Optional<Resource> exRes = resourceRepository.findById(childResource.getResourceId());
                 try {
-                    if(!resourceProperties.isEmpty() && exRes.isPresent()) {
+                    if (!resourceProperties.isEmpty() && exRes.isPresent()) {
                         CustosUtils.mergeResourceEntity(custosClientProvider, callUser.getTenantId(),
                                 "", childResource.getType(), childResource.getResourceId(),
                                 resourceProperties.get(0).getPropertyValue(), resourceProperties.get(0).getPropertyValue(),
                                 callUser.getUsername());
-                        Resource chResource =  exRes.get();
+                        Resource chResource = exRes.get();
                         chResource.setParentResourceId(null);
                         resourceRepository.save(chResource);
                     }
@@ -643,6 +662,92 @@
 
     }
 
+
+    @Override
+    public void fetchUnverifiedResources(ResourceSearchRequest request, StreamObserver<ResourceSearchResponse> responseObserver) {
+        try {
+            PageRequest pageRequest = PageRequest.of(request.getOffset(), request.getLimit());
+            Page<UnverifiedResource> resources = unverifiedResourceRepository.findAll(pageRequest);
+
+            ResourceSearchResponse.Builder response = ResourceSearchResponse.newBuilder();
+
+            Map<String, String> props = new HashMap<>();
+
+
+            resources.forEach(resource -> {
+                Map<String, String> prop = new HashMap<>();
+                prop.put("ERROR_CODE", resource.getErrorCode());
+                prop.put("ERROR_DISCRIPTION", resource.getErrorDiscription());
+                response.addResources(GenericResource.newBuilder()
+                        .setResourceId(resource.getId())
+                        .setResourcePath(resource.getPath())
+                        .setType(resource.getType())
+                        .putAllProperties(prop).build());
+            });
+
+            responseObserver.onNext(response.build());
+            responseObserver.onCompleted();
+        } catch (Exception ex) {
+            String msg = " Error occurred while fetching unverified resources  " + ex.getMessage();
+            logger.error(" Error occurred while fetching unverified resources {} ", ex.getMessage(), ex);
+            responseObserver.onError(Status.INTERNAL.withDescription(msg).asRuntimeException());
+        }
+
+    }
+
+    @Override
+    public void createUnverifiedResource(ResourceCreateRequest request, StreamObserver<ResourceCreateResponse> responseObserver) {
+        try {
+            AuthenticatedUser callUser = request.getAuthToken().getAuthenticatedUser();
+            String type = request.getResource().getType();
+            String entityId = request.getResource().getResourceId();
+            String path = request.getResource().getResourcePath();
+            String tenantId = request.getAuthToken().getAuthenticatedUser().getTenantId();
+            String errorCode = request.getResource().getPropertiesMap().get("ERROR_CODE");
+            String errorDescription = request.getResource().getPropertiesMap().get("ERROR_DESCRIPTION");
+
+
+            UnverifiedResource unverifiedResource = new UnverifiedResource();
+            unverifiedResource.setId(entityId);
+            unverifiedResource.setPath(path);
+            unverifiedResource.setErrorCode(errorCode);
+            unverifiedResource.setErrorDiscription(errorDescription);
+            unverifiedResource.setTenantId(tenantId);
+            unverifiedResource.setType(type);
+
+            if (!callUser.getUsername().isEmpty()) {
+                unverifiedResource.setUnverifiedAssociatedOwner(callUser.getUsername());
+            }
+
+            unverifiedResourceRepository.save(unverifiedResource);
+
+            ResourceCreateResponse response = ResourceCreateResponse
+                    .newBuilder()
+                    .setResource(request.getResource())
+                    .build();
+            responseObserver.onNext(response);
+            responseObserver.onCompleted();
+            return;
+
+        } catch (Exception ex) {
+            logger.error("Error occurred while creating unverified resource {}", request.getResource().getResourceId(), ex);
+            String msg = "Error occurred while creating unverified resource" + ex.getMessage();
+            responseObserver.onError(Status.INTERNAL.withDescription(msg).asRuntimeException());
+        }
+    }
+
+    @Override
+    public void deleteUnverifiedResource(ResourceUpdateRequest request, StreamObserver<ResourceUpdateResponse> responseObserver) {
+        try {
+            unverifiedResourceRepository.deleteById(request.getResourceId());
+        } catch (Exception ex) {
+
+            logger.error("Error occurred while deleting unverified resource {}", request.getResource().getResourceId(), ex);
+            responseObserver.onError(Status.INTERNAL.
+                    withDescription("Error occurred while deleting unverified resource {}" + request.getResource().getResourceId()).asRuntimeException());
+        }
+    }
+
     private Set<ResourceProperty> mergeProperties(Resource resource, Map<String, Object> values) {
 
         Set<ResourceProperty> exProperties = resource.getResourceProperty();
@@ -678,4 +783,30 @@
         return newProperties;
     }
 
+
+    private void invokeUnVerifiedResourceRegistrationWorkflow(String username) {
+        List<UnverifiedResource> unverifiedResources = unverifiedResourceRepository
+                .getUnverifiedResourceByUnverifiedAssociatedOwnerAndErrorCode(username, "ERR_0002");
+
+        NotificationClient notificationClient = new NotificationClient(
+                orchHost, orchPort);
+
+        if (!unverifiedResources.isEmpty()) {
+
+            for (UnverifiedResource unverifiedResource : unverifiedResources) {
+
+                NotificationInvokeResponse response = notificationClient.get()
+                        .invokeNotification(NotificationInvokeRequest
+                                .newBuilder()
+                                .setNotificationId(unverifiedResource.getId())
+                                .build());
+                if (response.getStatus()) {
+                    unverifiedResourceRepository.deleteById(unverifiedResource.getId());
+                }
+
+            }
+        }
+
+    }
+
 }
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/interceptors/Authenticator.java b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/interceptors/Authenticator.java
index aa3007f..beb9564 100644
--- a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/interceptors/Authenticator.java
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/interceptors/Authenticator.java
@@ -120,7 +120,7 @@
                         String tenantId = drmsServiceAuthToken.getAuthenticatedUser().getTenantId();
                         Struct struct = identityManagementClient
                                 .getAgentToken(tenantId, agentClientId, agentClientSec, "client_credentials", "");
-                        if (struct.getFieldsMap().get("access_token").isInitialized()) {
+                        if (struct.getFieldsMap().get("access_token").isInitialized() && ! drmsServiceAuthToken.getUserUnverified()) {
                             UserRepresentation user = userManagementClient.getUser(username, tenantId);
                             return Optional.ofNullable(AuthenticatedUser.newBuilder()
                                     .setUsername(user.getUsername())
@@ -129,6 +129,14 @@
                                     .setEmailAddress(user.getEmail())
                                     .setTenantId(tenantId)
                                     .build());
+                        } else {
+                          AuthenticatedUser user =   AuthenticatedUser.newBuilder()
+                                    .setTenantId(tenantId)
+                                    .build();
+                          if (username == null || username.isEmpty()) {
+                              user = user.toBuilder().setUsername(username).build();
+                          }
+                            return Optional.ofNullable(user);
                         }
                     }
                 } else {
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/model/UnverifiedResource.java b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/model/UnverifiedResource.java
new file mode 100644
index 0000000..b7b44ae
--- /dev/null
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/model/UnverifiedResource.java
@@ -0,0 +1,91 @@
+package org.apache.airavata.drms.api.persistance.model;
+
+
+import org.springframework.data.jpa.domain.support.AuditingEntityListener;
+
+import javax.persistence.*;
+
+@Entity
+@Table(name = "UNVERIFIED_RESOURCE")
+@EntityListeners(AuditingEntityListener.class)
+public class UnverifiedResource {
+
+    @Id
+    @Column(name="ID")
+    private String id;
+
+    @Column(name="TENANT_ID",nullable = false)
+    private String tenantId;
+
+    @Column(name="RESOURCE_PATH")
+    private String path;
+
+    @Column(name="ERROR_CODE")
+    private String errorCode;
+
+    @Column(name="ERROR_DISCRIPTION")
+    private String errorDiscription;
+
+    @Column(name="TYPE")
+    private String type;
+
+    @Column(name="UNVERIFIED_OWNER")
+    private String unverifiedAssociatedOwner;
+
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+
+    public void setTenantId(String tenantId) {
+        this.tenantId = tenantId;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public String getErrorCode() {
+        return errorCode;
+    }
+
+    public void setErrorCode(String errorCode) {
+        this.errorCode = errorCode;
+    }
+
+    public String getErrorDiscription() {
+        return errorDiscription;
+    }
+
+    public void setErrorDiscription(String errorDiscription) {
+        this.errorDiscription = errorDiscription;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getUnverifiedAssociatedOwner() {
+        return unverifiedAssociatedOwner;
+    }
+
+    public void setUnverifiedAssociatedOwner(String unverifiedAssociatedOwner) {
+        this.unverifiedAssociatedOwner = unverifiedAssociatedOwner;
+    }
+}
diff --git a/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/repository/UnverifiedResourceRepository.java b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/repository/UnverifiedResourceRepository.java
new file mode 100644
index 0000000..2ad5877
--- /dev/null
+++ b/data-resource-management-service/drms-rdbms-impl/drms-server/src/main/java/org/apache/airavata/drms/api/persistance/repository/UnverifiedResourceRepository.java
@@ -0,0 +1,15 @@
+package org.apache.airavata.drms.api.persistance.repository;
+
+import org.apache.airavata.drms.api.persistance.model.Resource;
+import org.apache.airavata.drms.api.persistance.model.UnverifiedResource;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Query;
+
+import java.util.List;
+
+public interface UnverifiedResourceRepository extends JpaRepository<UnverifiedResource, String>  {
+
+
+    public List<UnverifiedResource> getUnverifiedResourceByUnverifiedAssociatedOwnerAndErrorCode(String username,String errorCode);
+
+}
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/Common.proto b/data-resource-management-service/drms-stubs/src/main/proto/Common.proto
index 604c1fd..6bcfeea 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/Common.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/Common.proto
@@ -35,6 +35,7 @@
     string access_token = 1;
     AuthenticatedUser authenticated_user = 2;
     AuthCredentialType auth_credential_type =3;
+    bool user_unverified =4;
 }
 
 enum AuthCredentialType {
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
index c696cb0..b6eb64d 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResource.proto
@@ -36,4 +36,5 @@
   string parent_resource_path = 7;
   string resource_name = 8;
   string parent_id = 9;
+  
 }
\ No newline at end of file
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
index f74f21e..2d886bc 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
@@ -86,6 +86,8 @@
     repeated ResourceSearchQuery queries = 2;
     int32 depth = 4;
     string type = 5;
+    int32 offset=6;
+    int32 limit=7;
 }
 
 message ResourceSearchResponse {
@@ -222,5 +224,23 @@
     };
     }
 
+    rpc fetchUnverifiedResources (ResourceSearchRequest) returns (ResourceSearchResponse) {
+        option (google.api.http) = {
+      get: "/v1.0/api/drms/resources/unverified"
+    };
+    }
+
+    rpc createUnverifiedResource (ResourceCreateRequest) returns (ResourceCreateResponse) {
+        option (google.api.http) = {
+      post: "/v1.0/api/drms/resource/unverified"
+    };
+    }
+
+    rpc deleteUnverifiedResource (ResourceUpdateRequest) returns (ResourceUpdateResponse) {
+        option (google.api.http) = {
+      delete: "/v1.0/api/drms/resource/unverified"
+    };
+    }
+
 
 }
\ No newline at end of file