/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.processors.mongodb;

import com.mongodb.client.MongoCursor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class PutMongoIT extends MongoWriteTestBase {
    @BeforeEach
    public void setup() {
        super.setup(PutMongo.class);
    }

    @Override
    @AfterEach
    public void teardown() {
        super.teardown();
    }

    private byte[] documentToByteArray(Document doc) {
        return doc.toJson().getBytes(StandardCharsets.UTF_8);
    }

    @Test
    public void testValidators() {
        TestRunner runner = TestRunners.newTestRunner(PutMongo.class);
        Collection<ValidationResult> results;
        ProcessContext pc;

        // missing uri, db, collection
        runner.enqueue(new byte[0]);
        pc = runner.getProcessContext();
        results = new HashSet<>();
        if (pc instanceof MockProcessContext) {
            results = ((MockProcessContext) pc).validate();
        }
        assertEquals(2, results.size());
        Iterator<ValidationResult> it = results.iterator();
        Assertions.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
        Assertions.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));

        // invalid write concern
        runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME);
        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
        runner.setProperty(PutMongo.WRITE_CONCERN, "xyz");
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        runner.enqueue(new byte[0]);
        pc = runner.getProcessContext();
        results = new HashSet<>();
        if (pc instanceof MockProcessContext) {
            results = ((MockProcessContext) pc).validate();
        }
        assertEquals(1, results.size());
        Assertions.assertTrue(results.iterator().next().toString().matches("'Write Concern' .* is invalid because Given value not found in allowed set .*"));

        // valid write concern
        runner.setProperty(PutMongo.WRITE_CONCERN, PutMongo.WRITE_CONCERN_UNACKNOWLEDGED);
        runner.enqueue(new byte[0]);
        pc = runner.getProcessContext();
        results = new HashSet<>();
        if (pc instanceof MockProcessContext) {
            results = ((MockProcessContext) pc).validate();
        }
        assertEquals(0, results.size());
    }

    @Test
    public void testQueryAndUpdateKey() {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        runner.setProperty(PutMongo.UPDATE_QUERY, "{}");
        runner.assertNotValid();
    }

    @Test
    public void testNoQueryAndNoUpdateKey() {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
        runner.removeProperty(PutMongo.UPDATE_QUERY);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "");
        runner.assertNotValid();
    }

    @Test
    public void testBlankUpdateKey() {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "  ");
        runner.assertNotValid();
    }

    @Test
    public void testUpdateQuery() {
        TestRunner runner = init(PutMongo.class);
        Document document = new Document()
            .append("name", "John Smith")
            .append("department", "Engineering");
        collection.insertOne(document);
        String updateBody = "{\n" +
            "\t\"$set\": {\n" +
            "\t\t\"email\": \"john.smith@test.com\",\n" +
            "\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
            "\t},\n" +
            "\t\"$inc\": {\n" +
            "\t\t\"writes\": 1\n" +
            "\t}\n" +
            "}";
        Map<String, String> attr = new HashMap<>();
        attr.put("mongo.update.query", document.toJson());
        runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
        runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}");
        runner.setValidateExpressionUsage(true);
        runner.enqueue(updateBody, attr);
        updateTests(runner, document);
    }

    @Test
    public void testUpdateBySimpleKey() {
        TestRunner runner = init(PutMongo.class);
        Document document = new Document()
            .append("name", "John Smith")
            .append("department", "Engineering");
        collection.insertOne(document);

        String updateBody = "{\n" +
            "\t\"name\": \"John Smith\",\n" +
            "\t\"$set\": {\n" +
            "\t\t\"email\": \"john.smith@test.com\",\n" +
            "\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
            "\t},\n" +
            "\t\"$inc\": {\n" +
            "\t\t\"writes\": 1\n" +
            "\t}\n" +
            "}";
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name");
        runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
        runner.setValidateExpressionUsage(true);
        runner.enqueue(updateBody);
        updateTests(runner, document);
    }

    @Test
    public void testUpdateWithFullDocByKeys() {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name,department");
        testUpdateFullDocument(runner);
    }

    @Test
    public void testUpdateWithFullDocByQuery() {
        TestRunner runner = init(PutMongo.class);
        String query = "{ \"name\": \"John Smith\"}";
        runner.setProperty(PutMongo.UPDATE_QUERY, query);
        testUpdateFullDocument(runner);
    }

    private void testUpdateFullDocument(TestRunner runner) {
        Document document = new Document()
                .append("name", "John Smith")
                .append("department", "Engineering");
        collection.insertOne(document);
        String updateBody = "{\n" +
                "\t\"name\": \"John Smith\",\n" +
                "\t\"department\": \"Engineering\",\n" +
                "\t\"contacts\": {\n" +
                "\t\t\"phone\": \"555-555-5555\",\n" +
                "\t\t\"email\": \"john.smith@test.com\",\n" +
                "\t\t\"twitter\": \"@JohnSmith\"\n" +
                "\t}\n" +
                "}";
        runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_DOC);
        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
        runner.setValidateExpressionUsage(true);
        runner.enqueue(updateBody);
        runner.run();
        runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
        runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);

        MongoCursor<Document> cursor = collection.find(document).iterator();
        Document found = cursor.next();
        assertEquals(found.get("name"), document.get("name"));
        assertEquals(found.get("department"), document.get("department"));
        Document contacts = (Document)found.get("contacts");
        Assertions.assertNotNull(contacts);
        assertEquals(contacts.get("twitter"), "@JohnSmith");
        assertEquals(contacts.get("email"), "john.smith@test.com");
        assertEquals(contacts.get("phone"), "555-555-5555");
        assertEquals(collection.count(document), 1);
    }

    @Test
    public void testUpdateByComplexKey() {
        TestRunner runner = init(PutMongo.class);
        Document document = new Document()
                .append("name", "John Smith")
                .append("department", "Engineering")
                .append("contacts", new Document().append("email", "john.smith@test.com")
                .append("phone", "555-555-5555"));
        collection.insertOne(document);
        String updateBody = "{\n" +
                "\t\"contacts.phone\": \"555-555-5555\",\n" +
                "\t\"contacts.email\": \"john.smith@test.com\",\n" +
                "\t\"$set\": {\n" +
                "\t\t\"contacts.twitter\": \"@JohnSmith\"\n" +
                "\t},\n" +
                "\t\"$inc\": {\n" +
                "\t\t\"writes\": 1\n" +
                "\t}\n" +
                "}";
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "contacts.phone,contacts.email");
        runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
        runner.setValidateExpressionUsage(true);
        runner.enqueue(updateBody);
        runner.run();
        runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
        runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);

        MongoCursor<Document> iterator = collection.find(new Document("name", "John Smith")).iterator();
        Assertions.assertTrue(iterator.hasNext(), "Document did not come back.");
        Document val = iterator.next();
        Map contacts = (Map)val.get("contacts");
        Assertions.assertNotNull(contacts);
        Assertions.assertTrue(contacts.containsKey("twitter") && contacts.get("twitter").equals("@JohnSmith"));
        Assertions.assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
    }

    private void updateTests(TestRunner runner, Document document) {
        runner.run();
        runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
        runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);

        MongoCursor<Document> iterator = collection.find(document).iterator();
        Assertions.assertTrue(iterator.hasNext(), "Document did not come back.");
        Document val = iterator.next();
        Assertions.assertTrue(val.containsKey("email") && val.get("email").equals("john.smith@test.com"));
        Assertions.assertTrue(val.containsKey("grade") && val.get("grade").equals("Sr. Principle Eng."));
        Assertions.assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
    }

    @Test
    public void testInsertOne() throws Exception {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        Document doc = DOCUMENTS.get(0);
        byte[] bytes = documentToByteArray(doc);

        runner.enqueue(bytes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
        MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
        out.assertContentEquals(bytes);

        // verify 1 doc inserted into the collection
        assertEquals(1, collection.count());
        assertEquals(doc, collection.find().first());
    }

    @Test
    public void testInsertMany() throws Exception {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        for (Document doc : DOCUMENTS) {
            runner.enqueue(documentToByteArray(doc));
        }
        runner.run(3);

        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 3);
        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
        for (int i=0; i < flowFiles.size(); i++) {
            flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
        }

        // verify 3 docs inserted into the collection
        assertEquals(3, collection.count());
    }

    @Test
    public void testInsertWithDuplicateKey() throws Exception {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        // pre-insert one document
        collection.insertOne(DOCUMENTS.get(0));

        for (Document doc : DOCUMENTS) {
            runner.enqueue(documentToByteArray(doc));
        }
        runner.run(3);

        // first doc failed, other 2 succeeded
        runner.assertTransferCount(PutMongo.REL_FAILURE, 1);
        MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_FAILURE).get(0);
        out.assertContentEquals(documentToByteArray(DOCUMENTS.get(0)));

        runner.assertTransferCount(PutMongo.REL_SUCCESS, 2);
        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
        for (int i=0; i < flowFiles.size(); i++) {
            flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i+1).toJson());
        }

        // verify 2 docs inserted into the collection for a total of 3
        assertEquals(3, collection.count());
    }

    /**
     * Verifies that 'update' does not insert if 'upsert' if false.
     * @see #testUpsert()
     */
    @Test
    public void testUpdateDoesNotInsert() throws Exception {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        Document doc = DOCUMENTS.get(0);
        byte[] bytes = documentToByteArray(doc);

        runner.setProperty(PutMongo.MODE, "update");
        runner.enqueue(bytes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
        MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
        out.assertContentEquals(bytes);

        // nothing was in collection, so nothing to update since upsert defaults to false
        assertEquals(0, collection.count());
    }

    /**
     * Verifies that 'update' does insert if 'upsert' is true.
     * @see #testUpdateDoesNotInsert()
     */
    @Test
    public void testUpsert() throws Exception {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        Document doc = DOCUMENTS.get(0);
        byte[] bytes = documentToByteArray(doc);

        runner.setProperty(PutMongo.MODE, "update");
        runner.setProperty(PutMongo.UPSERT, "true");
        runner.enqueue(bytes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
        MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
        out.assertContentEquals(bytes);

        // verify 1 doc inserted into the collection
        assertEquals(1, collection.count());
        assertEquals(doc, collection.find().first());
    }

    @Test
    public void testUpsertWithOid() throws Exception {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        byte[] bytes = documentToByteArray(oidDocument);

        runner.setProperty(PutMongo.MODE, "update");
        runner.setProperty(PutMongo.UPSERT, "true");
        runner.enqueue(bytes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
        MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
        out.assertContentEquals(bytes);

        // verify 1 doc inserted into the collection
        assertEquals(1, collection.count());
        assertEquals(oidDocument, collection.find().first());
    }

    @Test
    public void testUpdate() throws Exception {
        TestRunner runner = init(PutMongo.class);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        Document doc = DOCUMENTS.get(0);

        // pre-insert document
        collection.insertOne(doc);

        // modify the object
        doc.put("abc", "123");
        doc.put("xyz", "456");
        doc.remove("c");

        byte[] bytes = documentToByteArray(doc);

        runner.setProperty(PutMongo.MODE, "update");
        runner.enqueue(bytes);
        runner.run();

        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
        MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
        out.assertContentEquals(bytes);

        assertEquals(1, collection.count());
        assertEquals(doc, collection.find().first());
    }

    @Test
    public void testUpsertWithOperators() throws Exception {
        TestRunner runner = init(PutMongo.class);
        String upsert = "{\n" +
                "  \"_id\": \"Test\",\n" +
                "  \"$push\": {\n" +
                "     \"testArr\": { \"msg\": \"Hi\" }\n" +
                "  }\n" +
                "}";
        runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
        runner.setProperty(PutMongo.MODE, "update");
        runner.setProperty(PutMongo.UPSERT, "true");
        for (int x = 0; x < 3; x++) {
            runner.enqueue(upsert.getBytes());
        }
        runner.run(3, true, true);
        runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
        runner.assertTransferCount(PutMongo.REL_SUCCESS, 3);

        Document query = new Document("_id", "Test");
        Document result = collection.find(query).first();
        List array = (List)result.get("testArr");
        Assertions.assertNotNull(array, "Array was empty");
        assertEquals(3, array.size(), "Wrong size");
        for (int index = 0; index < array.size(); index++) {
            Document doc = (Document)array.get(index);
            String msg = doc.getString("msg");
            Assertions.assertNotNull("Msg was null", msg);
            assertEquals(msg, "Hi", "Msg had wrong value");
        }
    }

    /*
     * Start NIFI-4759 Regression Tests
     *
     * 2 issues with ID field:
     *
     * * Assumed _id is the update key, causing failures when the user configured a different one in the UI.
     * * Treated _id as a string even when it is an ObjectID sent from another processor as a string value.
     *
     * Expected behavior:
     *
     * * update key field should work no matter what (legal) value it is set to be.
     * * _ids that are ObjectID should become real ObjectIDs when added to Mongo.
     * * _ids that are arbitrary strings should be still go in as strings.
     *
     */
    @Test
    public void testNiFi_4759_Regressions() {
        TestRunner runner = init(PutMongo.class);
        String[] upserts = new String[]{
                "{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }",
                "{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\": \"Hello, world\" } }",
                "{ \"updateKey\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }"
        };

        String[] updateKeyProps = new String[] { "_id", "_id", "updateKey" };
        Object[] updateKeys = new Object[] { "12345", new ObjectId("5a5617b9c1f5de6d8276e87d"), "12345" };
        int index = 0;

        runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
        runner.setProperty(PutMongo.MODE, "update");
        runner.setProperty(PutMongo.UPSERT, "true");

        final int LIMIT = 2;

        for (String upsert : upserts) {
            runner.setProperty(PutMongo.UPDATE_QUERY_KEY, updateKeyProps[index]);
            for (int x = 0; x < LIMIT; x++) {
                runner.enqueue(upsert);
            }
            runner.run(LIMIT, true, true);
            runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
            runner.assertTransferCount(PutMongo.REL_SUCCESS, LIMIT);

            Document query = new Document(updateKeyProps[index], updateKeys[index]);
            Document result = collection.find(query).first();
            Assertions.assertNotNull(result, "Result was null");
            assertEquals(1, collection.count(query), "Count was wrong");
            runner.clearTransferState();
            index++;
        }
    }

    @Test
    public void testClientService() throws Exception {
        MongoDBClientService clientService = new MongoDBControllerService();
        TestRunner runner = init(PutMongo.class);
        runner.addControllerService("clientService", clientService);
        runner.removeProperty(PutMongo.URI);
        runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
        runner.setProperty(PutMongo.CLIENT_SERVICE, "clientService");
        runner.enableControllerService(clientService);
        runner.assertValid();

        runner.enqueue("{ \"msg\": \"Hello, world\" }");
        runner.run();
        runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
        runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
    }
}
