/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
 * for the specific language governing permissions and limitations under the License.
 */
package org.apache.phoenix.end2end;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT {
    public static final String TEST_TABLE_DDL =
            "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + "    ORGANIZATION_ID CHAR(15) NOT NULL,\n"
                    + "    FEED_ELEMENT_ID CHAR(15) NOT NULL,\n"
                    + "    CONTAINER_ID CHAR(15) NOT NULL,\n"
                    + "    FEED_TYPE VARCHAR(1) NOT NULL, \n"
                    + "    NETWORK_ID CHAR(15) NOT NULL,\n" + "    USER_ID CHAR(15) NOT NULL,\n"
                    + "    CREATED_TIME TIMESTAMP,\n" + "    LAST_UPDATE TIMESTAMP,\n"
                    + "    RELEVANCE_SCORE DOUBLE,\n" + "    FEED_ITEM_TYPE VARCHAR(1),\n"
                    + "    FEED_ELEMENT_TYPE VARCHAR(1),\n"
                    + "    FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n"
                    + "    FEED_ELEMENT_STATUS VARCHAR(1),\n"
                    + "    FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + "    PARENT_ID CHAR(15),\n"
                    + "    CREATED_BY CHAR(15),\n" + "    BEST_COMMENT_ID CHAR(15),\n"
                    + "    COMMENT_COUNT INTEGER,\n" + "    CONSTRAINT PK PRIMARY KEY\n" + "    (\n"
                    + "        ORGANIZATION_ID,\n" + "        FEED_ELEMENT_ID,\n"
                    + "        CONTAINER_ID,\n" + "        FEED_TYPE,\n" + "        NETWORK_ID,\n"
                    + "        USER_ID\n" + "    )\n" + ") COLUMN_ENCODED_BYTES = 0";

    public static final String INDEX_1_DDL =
            "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + "    NETWORK_ID,\n"
                    + "    CONTAINER_ID,\n" + "    FEED_TYPE,\n" + "    USER_ID,\n"
                    + "    CREATED_TIME DESC,\n" + "    FEED_ELEMENT_ID DESC,\n"
                    + "    CREATED_BY\n" + ") "
                    + "    INCLUDE (\n" + "    FEED_ITEM_TYPE,\n"
                    + "    FEED_ELEMENT_TYPE,\n" + "    FEED_ELEMENT_IS_SYS_GEN,\n"
                    + "    FEED_ELEMENT_STATUS,\n" + "    FEED_ELEMENT_VISIBILITY,\n"
                    + "    PARENT_ID,\n" + "    BEST_COMMENT_ID,\n" + "    COMMENT_COUNT\n" + ")";

    private static final String UPSERT_INTO_DATA_TABLE =
            "UPSERT INTO %s\n" + "(\n" + "    ORGANIZATION_ID,\n" + "    FEED_ELEMENT_ID,\n"
                    + "    CONTAINER_ID,\n" + "    FEED_TYPE,\n" + "    NETWORK_ID,\n"
                    + "    USER_ID,\n" + "    CREATED_TIME,\n" + "    LAST_UPDATE,\n"
                    + "    FEED_ITEM_TYPE,\n" + "    FEED_ELEMENT_TYPE,\n"
                    + "    FEED_ELEMENT_IS_SYS_GEN,\n" + "    FEED_ELEMENT_STATUS,\n"
                    + "    FEED_ELEMENT_VISIBILITY,\n" + "    PARENT_ID,\n" + "    CREATED_BY,\n"
                    + "    BEST_COMMENT_ID,\n" + "    COMMENT_COUNT\n" + ")"
                    + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

    private String dataTableName;
    private String indexTableName;
    private String schemaName;
    private String dataTableFullName;
    private static String indexTableFullName;
    private static final Logger LOGGER =
            LoggerFactory.getLogger(PartialScannerResultsDisabledIT.class);
    private static Random random = new Random(1);
    // background writer threads
    private static Random sourceOfRandomness = new Random(0);
    private static AtomicInteger upsertIdCounter = new AtomicInteger(1);

    @Before
    public void setup() throws Exception {
        // create the tables
        generateUniqueTableNames();
        createTestTable(getUrl(), String.format(TEST_TABLE_DDL, dataTableFullName));
        createTestTable(getUrl(), String.format(INDEX_1_DDL, indexTableName, dataTableFullName));
    }
    
    @Test
    public void testWithEnoughData() throws Exception {
        try (Connection conn = DriverManager.getConnection(getUrl())) {
            // Write enough data to trigger partial scanner results
            // TODO: it's likely that less data could be written if whatever
            // config parameters decide this are lowered.
            writeSingleBatch(conn, 100, 20, dataTableFullName);
            LOGGER.info("Running scrutiny");
            // Scutunize index to see if partial results are silently returned
            // In that case we'll get a false positive on the scrutiny run.
            long rowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
            assertEquals(2000,rowCount);
        }
    }

    /**
     * Simple select query with fetch size that exceed the result size. In that case scan would start to produce
     * partial result sets that from Phoenix perspective are the rows with NULL values.
     * @throws SQLException
     */
    @Test
    public void partialResultDuringSelect () throws SQLException {
        String tableName = generateUniqueName();
        Properties props = new Properties();
        props.setProperty(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, "5");
        int numRecords = 10;
        try (Connection conn = DriverManager.getConnection(url, props)) {
            conn.createStatement().execute(
                    "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)");
            int i = 0;
            String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
            PreparedStatement stmt = conn.prepareStatement(upsert);
            while (i < numRecords) {
                stmt.setInt(1, i);
                stmt.setString(2, UUID.randomUUID().toString());
                stmt.executeUpdate();
                i++;
            }
            conn.commit();

            String sql = "SELECT * FROM " + tableName;
            // at every next call wait for this period. This will cause lease to expire.
            Statement s = conn.createStatement();
            s.setFetchSize(100);
            ResultSet rs = s.executeQuery(sql);
            int count = 0;
            while (rs.next()) {
                if (rs.getString(2) == null)
                    fail("Null value because of partial row scan");
            }
            count++;
        }

    }

    private static String randString(int length, Random random) {
        return RandomStringUtils.randomAlphabetic(length);
    }
    
    public static void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception {
        for (int j = 0; j < numBatches; j++) {
            try (PreparedStatement statement =
                    connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) {
                for (int i = 0; i < batchSize; i++) {
                    int index = 0;
                    String id = "" + upsertIdCounter.getAndIncrement();
                    statement.setString(++index, id); // ORGANIZATION_ID
                    statement.setString(++index, id); // FEED_ELEMENT_ID,\n"
                    statement.setString(++index, id); // CONTAINER_ID,\n"
                    statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_TYPE,\n"
                    statement.setString(++index, randString(15, sourceOfRandomness)); // NETWORK_ID,\n"
                    statement.setString(++index, randString(15, sourceOfRandomness)); // USER_ID,\n"
                    statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // CREATED_TIME,\n"
                    statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // LAST_UPDATE\n"
                    statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ITEM_TYPE\n"
                    statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_TYPE\n"
                    statement.setBoolean(++index, false); // FEED_ELEMENT_IS_SYS_GEN\n"
                    statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_STATUS\n"
                    statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_VISIBILITY\n"
                    statement.setString(++index, randString(15, sourceOfRandomness)); // PARENT_ID\n"
                    statement.setString(++index, randString(15, sourceOfRandomness)); // CREATED_BY\n"
                    statement.setString(++index, randString(15, sourceOfRandomness)); // BEST_COMMENT_ID\n"
                    statement.setInt(++index, random.nextInt()); // COMMENT_COUNT\n" + ")"
                    statement.execute();
                }
                connection.commit();
            }
        }
    }
    
    private void generateUniqueTableNames() {
        schemaName = generateUniqueName();
        dataTableName = generateUniqueName() + "_DATA";
        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
        indexTableName = generateUniqueName() + "_IDX";
        indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
    }

}
