| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.airavata.drms.api.handlers; |
| |
| import com.google.protobuf.Empty; |
| import io.grpc.ManagedChannel; |
| import io.grpc.ManagedChannelBuilder; |
| import io.grpc.stub.StreamObserver; |
| import org.apache.airavata.datalake.drms.DRMSServiceAuthToken; |
| import org.apache.airavata.datalake.drms.groups.FetchCurrentUserRequest; |
| import org.apache.airavata.datalake.drms.groups.FetchCurrentUserResponse; |
| import org.apache.airavata.datalake.drms.groups.GroupServiceGrpc; |
| import org.apache.airavata.datalake.drms.groups.User; |
| import org.apache.airavata.datalake.drms.resource.GenericResource; |
| import org.apache.airavata.datalake.drms.storage.*; |
| import org.apache.airavata.drms.core.Neo4JConnector; |
| import org.apache.airavata.drms.core.constants.ResourceConstants; |
| import org.apache.airavata.drms.core.constants.StorageConstants; |
| import org.apache.airavata.drms.core.deserializer.AnyStoragePreferenceDeserializer; |
| import org.apache.airavata.drms.core.deserializer.GenericResourceDeserializer; |
| import org.apache.airavata.drms.core.deserializer.MetadataDeserializer; |
| import org.lognet.springboot.grpc.GRpcService; |
| import org.neo4j.driver.Record; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.factory.annotation.Autowired; |
| |
| import java.util.List; |
| |
| @GRpcService |
| public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceImplBase { |
| |
| private static final Logger logger = LoggerFactory.getLogger(ResourceServiceHandler.class); |
| |
| @Autowired |
| private Neo4JConnector neo4JConnector; |
| |
| @org.springframework.beans.factory.annotation.Value("${group.service.host}") |
| private String groupServiceHost; |
| |
| @org.springframework.beans.factory.annotation.Value("${group.service.port}") |
| private int groupServicePort; |
| |
| private User getUser(DRMSServiceAuthToken authToken) { |
| ManagedChannel channel = ManagedChannelBuilder.forAddress(groupServiceHost, groupServicePort).usePlaintext().build(); |
| GroupServiceGrpc.GroupServiceBlockingStub groupClient = GroupServiceGrpc.newBlockingStub(channel); |
| FetchCurrentUserResponse userResponse = groupClient.fetchCurrentUser( |
| FetchCurrentUserRequest.newBuilder().setAuthToken(authToken).build()); |
| return userResponse.getUser(); |
| } |
| |
| @Override |
| public void fetchResource(ResourceFetchRequest request, StreamObserver<ResourceFetchResponse> responseObserver) { |
| User callUser = getUser(request.getAuthToken()); |
| |
| // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp), |
| List<Record> records = this.neo4JConnector.searchNodes( |
| "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " + |
| "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " + |
| "where res.resourceId = '" + request.getResourceId() + "' and u.userId = '" |
| + callUser.getUserId() + "' return distinct res, sp, s"); |
| |
| if (!records.isEmpty()) { |
| try { |
| List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records); |
| responseObserver.onNext(ResourceFetchResponse.newBuilder().setResource(genericResourceList.get(0)).build()); |
| responseObserver.onCompleted(); |
| } catch (Exception e) { |
| |
| logger.error("Errored while fetching resource with id {}", request.getResourceId(), e); |
| responseObserver.onError(new Exception("Errored while fetching resource with id " |
| + request.getResourceId() + ". Msg " + e.getMessage())); |
| } |
| } else { |
| logger.error("Could not find a generic resource with id {}", request.getResourceId()); |
| responseObserver.onError(new Exception("Could not find a generic resource with id " |
| + request.getResourceId())); |
| } |
| } |
| |
| @Override |
| public void createResource(ResourceCreateRequest request, StreamObserver<ResourceCreateResponse> responseObserver) { |
| super.createResource(request, responseObserver); |
| } |
| |
| @Override |
| public void updateResource(ResourceUpdateRequest request, StreamObserver<ResourceUpdateResponse> responseObserver) { |
| super.updateResource(request, responseObserver); |
| } |
| |
| @Override |
| public void deletePreferenceStorage(ResourceDeleteRequest request, StreamObserver<Empty> responseObserver) { |
| super.deletePreferenceStorage(request, responseObserver); |
| } |
| |
| @Override |
| public void searchResource(ResourceSearchRequest request, StreamObserver<ResourceSearchResponse> responseObserver) { |
| User callUser = getUser(request.getAuthToken()); |
| |
| // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp), |
| List<Record> records = this.neo4JConnector.searchNodes( |
| "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " + |
| "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " + |
| "where u.userId = '" + callUser.getUserId() + "' return distinct res, sp, s"); |
| try { |
| List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records); |
| ResourceSearchResponse.Builder builder = ResourceSearchResponse.newBuilder(); |
| builder.addAllResources(genericResourceList); |
| responseObserver.onNext(builder.build()); |
| responseObserver.onCompleted(); |
| |
| } catch (Exception e) { |
| logger.error("Errored while searching generic resources; Message: {}", e.getMessage(), e); |
| responseObserver.onError(e); |
| } |
| } |
| |
| @Override |
| public void addResourceMetadata(AddResourceMetadataRequest request, StreamObserver<Empty> responseObserver) { |
| User callUser = getUser(request.getAuthToken()); |
| this.neo4JConnector.createMetadataNode(ResourceConstants.RESOURCE_LABEL, "resourceId", |
| request.getResourceId(), callUser.getUserId(), |
| request.getMetadata().getKey(), request.getMetadata().getValue()); |
| responseObserver.onNext(Empty.getDefaultInstance()); |
| responseObserver.onCompleted(); |
| } |
| |
| @Override |
| public void fetchResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<FetchResourceMetadataResponse> responseObserver) { |
| User callUser = getUser(request.getAuthToken()); |
| List<Record> records = neo4JConnector.searchNodes("match (u:User)-[MEMBER_OF]->(g:Group)<-[SHARED_WITH]-(res:Resource)-[r:HAS_METADATA]->(m:Metadata) " + |
| "where u.userId ='" + callUser.getUserId()+ "' and res.resourceId = '" + request.getResourceId() + "' return distinct m"); |
| try { |
| List<MetadataNode> metadataNodes = MetadataDeserializer.deserializeList(records); |
| if (metadataNodes.size() == 1) { |
| responseObserver.onNext(FetchResourceMetadataResponse.newBuilder().setMetadataNode(metadataNodes.get(0)).build()); |
| responseObserver.onCompleted(); |
| } else { |
| logger.error("No metadata entry for resource {}", request.getResourceId()); |
| responseObserver.onError(new Exception("No metadata entry for resource " + request.getResourceId())); |
| } |
| } catch (Exception e) { |
| logger.error("Errored while fetching metadata for resource with id {}", request.getResourceId(), e); |
| responseObserver.onError(new Exception("Errored while fetching metadata for resource with id " |
| + request.getResourceId() + ". Msg " + e.getMessage())); |
| } |
| } |
| } |