/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.phoenix.end2end.index;

import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@RunWith(Parameterized.class)
public class IndexCoprocIT extends ParallelStatsDisabledIT {
    private boolean isNamespaceMapped = false;
    private boolean isMultiTenant = false;
    public static final String GLOBAL_INDEX_CHECKER_CONFIG =
        "|org.apache.phoenix.index.GlobalIndexChecker|805306365|";
    public static final String INDEX_REGION_OBSERVER_CONFIG =
        "|org.apache.phoenix.hbase.index.IndexRegionObserver" +
            "|805306366|org.apache.hadoop.hbase.index.codec.class=" +
            "org.apache.phoenix.index.PhoenixIndexCodec," +
            "index.builder=org.apache.phoenix.index.PhoenixIndexBuilder";
    public static final String INDEXER_CONFIG =
        "|org.apache.phoenix.hbase.index.Indexer" +
            "|805306366|org.apache.hadoop.hbase.index.codec.class=" +
            "org.apache.phoenix.index.PhoenixIndexCodec," +
            "index.builder=org.apache.phoenix.index.PhoenixIndexBuilder";

    public IndexCoprocIT(boolean isMultiTenant){
        this.isMultiTenant = isMultiTenant;
    }
    @Parameterized.Parameters(name ="CreateIndexCoprocIT_mulitTenant={0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[][]{{true}, {false}});
    }

    @Test
    public void testCreateCoprocs() throws Exception {
        String schemaName = "S" + generateUniqueName();
        String tableName = "T_" + generateUniqueName();
        String indexName = "I_" + generateUniqueName();
        String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName,
            isNamespaceMapped).getString();
        String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
            indexName, isNamespaceMapped).getString();
        Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();

        createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
        createIndexTable(schemaName, tableName, indexName);

        HTableDescriptor baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
        HTableDescriptor indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));

        assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
        assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);

        removeCoproc(IndexRegionObserver.class, baseDescriptor, admin);
        removeCoproc(IndexRegionObserver.class, indexDescriptor, admin);
        removeCoproc(GlobalIndexChecker.class, indexDescriptor, admin);

        Map<String, String> props = new HashMap<String, String>();
        props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
        Indexer.enableIndexing(baseDescriptor, PhoenixIndexBuilder.class,
            props, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
        admin.modifyTable(baseDescriptor.getTableName(), baseDescriptor);
        baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
        indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
        assertUsingOldCoprocs(baseDescriptor, indexDescriptor);

        createBaseTable(schemaName, tableName, true, 0, null);
        baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
        indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
        assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
    }

    @Test
    public void testCreateOnExistingHBaseTable() throws Exception {
        String schemaName = generateUniqueName();
        String tableName = generateUniqueName();
        String indexName = generateUniqueName();
        byte[] cf = Bytes.toBytes("f");
        try (PhoenixConnection conn = getConnection()){
            TableName table = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName,
                tableName, isNamespaceMapped).getString());
            HTableDescriptor originalDesc = new HTableDescriptor(table);
            originalDesc.addFamily(new HColumnDescriptor(cf));
            Admin admin = conn.getQueryServices().getAdmin();
            admin.createTable(originalDesc);
            createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
            HTableDescriptor baseDescriptor = admin.getTableDescriptor(table);
            assertUsingNewCoprocs(baseDescriptor);
            createIndexTable(schemaName, tableName, indexName);
            baseDescriptor = admin.getTableDescriptor(table);
            TableName indexTable = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName,
                indexName, isNamespaceMapped).getString());
            HTableDescriptor indexDescriptor = admin.getTableDescriptor(indexTable);
            assertUsingNewCoprocs(baseDescriptor, indexDescriptor);
        }
    }

    @Test
    public void testAlterDoesntChangeCoprocs() throws Exception {
        String schemaName = "S" + generateUniqueName();
        String tableName = "T_" + generateUniqueName();
        String indexName = "I_" + generateUniqueName();
        String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName,
            isNamespaceMapped).getString();
        String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
            indexName, isNamespaceMapped).getString();
        Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();

        createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
        createIndexTable(schemaName, tableName, indexName);
        HTableDescriptor baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
        HTableDescriptor indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));

        assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
        assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
        String columnName = "foo";
        addColumnToBaseTable(schemaName, tableName, columnName);
        assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
        assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
        dropColumnToBaseTable(schemaName, tableName, columnName);
        assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
        assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
    }
    private void assertUsingOldCoprocs(HTableDescriptor baseDescriptor,
                                       HTableDescriptor indexDescriptor) {
        assertCoprocsContains(Indexer.class, baseDescriptor);
        assertCoprocConfig(baseDescriptor, Indexer.class.getName(),
            INDEXER_CONFIG);
        assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor);
        assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor);
        assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor);
    }

    private void assertUsingNewCoprocs(HTableDescriptor baseDescriptor) {
        assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
        assertCoprocsNotContains(Indexer.class, baseDescriptor);
    }

    private void assertUsingNewCoprocs(HTableDescriptor baseDescriptor,
                                       HTableDescriptor indexDescriptor) {
        assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
        assertCoprocConfig(baseDescriptor, IndexRegionObserver.class.getName(),
            INDEX_REGION_OBSERVER_CONFIG);
        assertCoprocsNotContains(Indexer.class, baseDescriptor);
        assertCoprocsNotContains(Indexer.class, indexDescriptor);
        assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
        assertCoprocConfig(indexDescriptor, GlobalIndexChecker.class.getName(),
            GLOBAL_INDEX_CHECKER_CONFIG);
    }

    private void assertCoprocsContains(Class clazz, HTableDescriptor descriptor) {
        String expectedCoprocName = clazz.getName();
        boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
        Assert.assertTrue("Could not find coproc " + expectedCoprocName +
            " in descriptor " + descriptor,foundCoproc);
    }

    private void assertCoprocsNotContains(Class clazz, HTableDescriptor descriptor) {
        String expectedCoprocName = clazz.getName();
        boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
        Assert.assertFalse("Could find coproc " + expectedCoprocName +
            " in descriptor " + descriptor,foundCoproc);
    }

    private boolean isCoprocPresent(HTableDescriptor descriptor, String expectedCoprocName) {
        boolean foundCoproc = false;
        for (String coprocName : descriptor.getCoprocessors()){
            if (coprocName.equals(expectedCoprocName)){
                foundCoproc = true;
                break;
            }
        }
        return foundCoproc;
    }

    public static void assertCoprocConfig(HTableDescriptor indexDesc,
                                   String className, String expectedConfigValue){
        boolean foundConfig = false;
        for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> entry :
            indexDesc.getValues().entrySet()){
            String propKey = Bytes.toString(entry.getKey().get());
            String propValue = Bytes.toString(entry.getValue().get());
            //Unfortunately, a good API to read coproc properties didn't show up until
            //HBase 2.0. Doing this the painful String-matching way to be compatible with 1.x
            if (propKey.contains("coprocessor")){
                if (propValue.contains(className)){
                    Assert.assertEquals(className + " is configured incorrectly",
                        expectedConfigValue,
                        propValue);
                    foundConfig = true;
                    break;
                }
            }
        }
        Assert.assertTrue("Couldn't find config for " + className, foundConfig);
    }

    private void removeCoproc(Class clazz, HTableDescriptor descriptor, Admin admin) throws Exception {
       descriptor.removeCoprocessor(clazz.getName());
       admin.modifyTable(descriptor.getTableName(), descriptor);
    }

    private void createIndexTable(String schemaName, String tableName, String indexName)
        throws SQLException {
        Connection conn = getConnection();
        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)");
    }

    private void addColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{
        Connection conn = getConnection();
        String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " +
            " ADD " + columnName + " varchar(512)";
        conn.createStatement().execute(ddl);
    }

    private void dropColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{
        Connection conn = getConnection();
        String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " +
            " DROP COLUMN " + columnName;
        conn.createStatement().execute(ddl);
    }

    private void createBaseTable(String schemaName, String tableName, boolean multiTenant, Integer saltBuckets, String splits)
        throws SQLException {
        Connection conn = getConnection();
        if (isNamespaceMapped) {
            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
        }
        String ddl = "CREATE TABLE IF NOT EXISTS "
            + SchemaUtil.getTableName(schemaName, tableName) + " (t_id VARCHAR NOT NULL,\n" +
            "k1 VARCHAR NOT NULL,\n" +
            "k2 INTEGER NOT NULL,\n" +
            "v1 VARCHAR,\n" +
            "v2 INTEGER,\n" +
            "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
        String ddlOptions = multiTenant ? "MULTI_TENANT=true" : "";
        if (saltBuckets != null) {
            ddlOptions = ddlOptions
                + (ddlOptions.isEmpty() ? "" : ",")
                + "salt_buckets=" + saltBuckets;
        }
        if (splits != null) {
            ddlOptions = ddlOptions
                + (ddlOptions.isEmpty() ? "" : ",")
                + "splits=" + splits;
        }
        conn.createStatement().execute(ddl + ddlOptions);
        conn.close();
    }

    private PhoenixConnection getConnection() throws SQLException{
        Properties props = new Properties();
        props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
        return (PhoenixConnection) DriverManager.getConnection(getUrl(),props);
    }

}
