/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.airavata.drms.api.handlers;

import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
import org.apache.airavata.datalake.drms.groups.FetchCurrentUserRequest;
import org.apache.airavata.datalake.drms.groups.FetchCurrentUserResponse;
import org.apache.airavata.datalake.drms.groups.GroupServiceGrpc;
import org.apache.airavata.datalake.drms.groups.User;
import org.apache.airavata.datalake.drms.resource.GenericResource;
import org.apache.airavata.datalake.drms.storage.*;
import org.apache.airavata.drms.core.Neo4JConnector;
import org.apache.airavata.drms.core.constants.ResourceConstants;
import org.apache.airavata.drms.core.constants.StorageConstants;
import org.apache.airavata.drms.core.deserializer.AnyStoragePreferenceDeserializer;
import org.apache.airavata.drms.core.deserializer.GenericResourceDeserializer;
import org.apache.airavata.drms.core.deserializer.MetadataDeserializer;
import org.lognet.springboot.grpc.GRpcService;
import org.neo4j.driver.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

@GRpcService
public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceImplBase {

    private static final Logger logger = LoggerFactory.getLogger(ResourceServiceHandler.class);

    @Autowired
    private Neo4JConnector neo4JConnector;

    @org.springframework.beans.factory.annotation.Value("${group.service.host}")
    private String groupServiceHost;

    @org.springframework.beans.factory.annotation.Value("${group.service.port}")
    private int groupServicePort;

    private User getUser(DRMSServiceAuthToken authToken) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress(groupServiceHost, groupServicePort).usePlaintext().build();
        GroupServiceGrpc.GroupServiceBlockingStub groupClient = GroupServiceGrpc.newBlockingStub(channel);
        FetchCurrentUserResponse userResponse = groupClient.fetchCurrentUser(
                FetchCurrentUserRequest.newBuilder().setAuthToken(authToken).build());
        return userResponse.getUser();
    }

    @Override
    public void fetchResource(ResourceFetchRequest request, StreamObserver<ResourceFetchResponse> responseObserver) {
        User callUser = getUser(request.getAuthToken());

        // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp),
        List<Record> records = this.neo4JConnector.searchNodes(
                "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " +
                        "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " +
                        "where res.resourceId = '" + request.getResourceId() + "' and u.userId = '"
                        + callUser.getUserId() + "' return distinct res, sp, s");

        if (!records.isEmpty()) {
            try {
                List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records);
                responseObserver.onNext(ResourceFetchResponse.newBuilder().setResource(genericResourceList.get(0)).build());
                responseObserver.onCompleted();
            } catch (Exception e) {

                logger.error("Errored while fetching resource with id {}", request.getResourceId(), e);
                responseObserver.onError(new Exception("Errored while fetching resource with id "
                        + request.getResourceId() + ". Msg " + e.getMessage()));
            }
        } else {
            logger.error("Could not find a generic resource with id {}", request.getResourceId());
            responseObserver.onError(new Exception("Could not find a generic resource with id "
                    + request.getResourceId()));
        }
    }

    @Override
    public void createResource(ResourceCreateRequest request, StreamObserver<ResourceCreateResponse> responseObserver) {
        super.createResource(request, responseObserver);
    }

    @Override
    public void updateResource(ResourceUpdateRequest request, StreamObserver<ResourceUpdateResponse> responseObserver) {
        super.updateResource(request, responseObserver);
    }

    @Override
    public void deletePreferenceStorage(ResourceDeleteRequest request, StreamObserver<Empty> responseObserver) {
        super.deletePreferenceStorage(request, responseObserver);
    }

    @Override
    public void searchResource(ResourceSearchRequest request, StreamObserver<ResourceSearchResponse> responseObserver) {
        User callUser = getUser(request.getAuthToken());

        // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp),
        List<Record> records = this.neo4JConnector.searchNodes(
                "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " +
                        "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " +
                        "where u.userId = '" + callUser.getUserId() + "' return distinct res, sp, s");
        try {
            List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records);
            ResourceSearchResponse.Builder builder = ResourceSearchResponse.newBuilder();
            builder.addAllResources(genericResourceList);
            responseObserver.onNext(builder.build());
            responseObserver.onCompleted();

        } catch (Exception e) {
            logger.error("Errored while searching generic resources; Message: {}", e.getMessage(), e);
            responseObserver.onError(e);
        }
    }

    @Override
    public void addResourceMetadata(AddResourceMetadataRequest request, StreamObserver<Empty> responseObserver) {
        User callUser = getUser(request.getAuthToken());
        this.neo4JConnector.createMetadataNode(ResourceConstants.RESOURCE_LABEL, "resourceId",
                request.getResourceId(), callUser.getUserId(),
                request.getMetadata().getKey(), request.getMetadata().getValue());
        responseObserver.onNext(Empty.getDefaultInstance());
        responseObserver.onCompleted();
    }

    @Override
    public void fetchResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<FetchResourceMetadataResponse> responseObserver) {
        User callUser = getUser(request.getAuthToken());
        List<Record> records = neo4JConnector.searchNodes("match (u:User)-[MEMBER_OF]->(g:Group)<-[SHARED_WITH]-(res:Resource)-[r:HAS_METADATA]->(m:Metadata) " +
                "where u.userId ='" + callUser.getUserId()+ "' and res.resourceId = '" + request.getResourceId() + "' return distinct m");
        try {
            List<MetadataNode> metadataNodes = MetadataDeserializer.deserializeList(records);
            if (metadataNodes.size() == 1) {
                responseObserver.onNext(FetchResourceMetadataResponse.newBuilder().setMetadataNode(metadataNodes.get(0)).build());
                responseObserver.onCompleted();
            } else {
                logger.error("No metadata entry for resource {}", request.getResourceId());
                responseObserver.onError(new Exception("No metadata entry for resource " + request.getResourceId()));
            }
        } catch (Exception e) {
            logger.error("Errored while fetching metadata for resource with id {}", request.getResourceId(), e);
            responseObserver.onError(new Exception("Errored while fetching metadata for resource with id "
                    + request.getResourceId() + ". Msg " + e.getMessage()));
        }
    }
}
