/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.metron.dataloads.nonbulk.flatfile;

import org.adrianwalker.multilinestring.Multiline;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.PosixParser;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.dataloads.extractor.csv.CSVExtractor;
import org.apache.metron.dataloads.hbase.mr.HBaseUtil;
import org.apache.metron.enrichment.converter.EnrichmentConverter;
import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.converter.EnrichmentValue;
import org.apache.metron.enrichment.lookup.LookupKV;
import org.apache.metron.test.utils.UnitTestHelper;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.startsWith;

public class SimpleEnrichmentFlatFileLoaderIntegrationTest {

  private static HBaseTestingUtility testUtil;

  /** The test table. */
  private static HTable testTable;
  private static Configuration config = null;
  private static TestingServer testZkServer;
  private static String zookeeperUrl;
  private static CuratorFramework client;
  private static final String tableName = "enrichment";
  private static final String cf = "cf";
  private static final String csvFile="input.csv";
  private static final String extractorJson = "extractor.json";
  private static final String enrichmentJson = "enrichment_config.json";
  private static final String log4jProperty = "log4j";
  private static final File file1 = new File("target/sefflt_data_1.csv");
  private static final File file2 = new File("target/sefflt_data_2.csv");
  private static final File multilineFile= new File("target/sefflt_data_2.csv");
  private static final File multilineZipFile= new File("target/sefflt_data_2.csv.zip");
  private static final File multilineGzFile= new File("target/sefflt_data_2.csv.gz");
  private static final File lineByLineExtractorConfigFile = new File("target/sefflt_extractorConfig_lbl.json");
  private static final File wholeFileExtractorConfigFile = new File("target/sefflt_extractorConfig_wf.json");
  private static final File stellarExtractorConfigFile = new File("target/sefflt_extractorConfig_stellar.json");
  private static final File customLineByLineExtractorConfigFile = new File("target/sefflt_extractorConfig_custom.json");
  private static final int NUM_LINES = 1000;

  /**
   * {
   *   "enrichment_property" : "valfromglobalconfig"
   * }
   */
  @Multiline
  public static String globalConfig;

  /**
   {
      "config" : {
        "columns" : {
          "host" : 0,
          "meta" : 2
                    },
        "indicator_column" : "host",
        "separator" : ",",
        "type" : "enrichment"
                 },
      "extractor" : "CSV"
   }
   */
  @Multiline
  private static String lineByLineExtractorConfig;

  /**
   {
      "config" : {
        "columns" : {
          "host" : 0,
          "meta" : 2
                    },
        "indicator_column" : "host",
        "separator" : ",",
        "type" : "enrichment"
                 },
      "extractor" : "CSV",
      "inputFormat" : "WHOLE_FILE"
   }
   */
  @Multiline
  private static String wholeFileExtractorConfig;

  /**
   *{
   *  "config" : {
   *    "zk_quorum" : "%ZK_QUORUM%",
   *    "columns" : {
   *      "host" : 0,
   *      "empty" : 1,
   *      "meta" : 2
   *    },
   *    "value_transform" : {
   *      "host" : "TO_UPPER(host)",
   *      "empty" : "enrichment_property"
   *    },
   *    "value_filter" : "LENGTH(host) > 0",
   *    "indicator_column" : "host",
   *    "indicator_transform" : {
   *      "indicator" : "TO_UPPER(indicator)"
   *    },
   *    "indicator_filter" : "LENGTH(indicator) > 0",
   *    "type" : "enrichment",
   *    "separator" : ","
   *  },
   *  "extractor" : "CSV"
   *}
   */
  @Multiline
  public static String stellarExtractorConfig;

  /**
   *{
   *  "config" : {
   *    "columns" : {
   *      "host" : 0,
   *      "meta" : 2
   *    },
   *    "value_transform" : {
   *      "host" : "TO_UPPER(host)"
   *    },
   *    "value_filter" : "LENGTH(host) > 0",
   *    "indicator_column" : "host",
   *    "indicator_transform" : {
   *      "indicator" : "TO_UPPER(indicator)"
   *    },
   *    "indicator_filter" : "LENGTH(indicator) > 0",
   *    "type" : "enrichment",
   *    "separator" : ","
   *  },
   *  "extractor" : "%EXTRACTOR_CLASS%"
   *}
   */
  @Multiline
  private static String customLineByLineExtractorConfig;

  @BeforeClass
  public static void setup() throws Exception {
    UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
    Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
    config = kv.getValue();
    testUtil = kv.getKey();
    testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
    zookeeperUrl = getZookeeperUrl(config.get("hbase.zookeeper.quorum"), testUtil.getZkCluster().getClientPort());
    setupGlobalConfig(zookeeperUrl);

    for(Result r : testTable.getScanner(Bytes.toBytes(cf))) {
      Delete d = new Delete(r.getRow());
      testTable.delete(d);
    }

    if(lineByLineExtractorConfigFile.exists()) {
      lineByLineExtractorConfigFile.delete();
    }
    Files.write( lineByLineExtractorConfigFile.toPath()
               , lineByLineExtractorConfig.getBytes()
               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
    );
    if(wholeFileExtractorConfigFile.exists()) {
      wholeFileExtractorConfigFile.delete();
    }
    Files.write( wholeFileExtractorConfigFile.toPath()
               , wholeFileExtractorConfig.getBytes()
               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
    );
    if(stellarExtractorConfigFile.exists()) {
      stellarExtractorConfigFile.delete();
    }
    Files.write( stellarExtractorConfigFile.toPath()
            , stellarExtractorConfig.replace("%ZK_QUORUM%", zookeeperUrl).getBytes()
            , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
    );
    if(customLineByLineExtractorConfigFile.exists()) {
      customLineByLineExtractorConfigFile.delete();
    }
    Files.write( customLineByLineExtractorConfigFile.toPath()
               , customLineByLineExtractorConfig.replace("%EXTRACTOR_CLASS%", CSVExtractor.class.getName()).getBytes()
               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
    );
    if(file1.exists()) {
      file1.delete();
    }
    Files.write( file1.toPath()
               , "google1.com,1,foo2\n".getBytes()
               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
    );
    if(file2.exists()) {
      file2.delete();
    }
    Files.write( file2.toPath()
               , "google2.com,2,foo2\n".getBytes()
               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
    );

    if(multilineFile.exists()) {
      multilineFile.delete();
    }
    if(multilineGzFile.exists()) {
      multilineGzFile.delete();
    }
    if(multilineGzFile.exists()) {
      multilineZipFile.delete();
    }
    PrintWriter[] pws =new PrintWriter[] {};
    try {
      ZipOutputStream zos = new ZipOutputStream(new FileOutputStream(multilineZipFile));
      ZipEntry entry = new ZipEntry("file");
      zos.putNextEntry(entry);
       pws = new PrintWriter[]{
         new PrintWriter(multilineFile),
         new PrintWriter(zos),
         new PrintWriter(new GZIPOutputStream(new FileOutputStream(multilineGzFile)))
                              };
      for(int i = 0;i < NUM_LINES;++i) {
        for(PrintWriter pw : pws) {
          pw.println("google" + i + ".com," + i + ",foo" + i);
        }
      }
    }
    finally {
      for(PrintWriter pw : pws) {
        pw.close();
      }
    }

  }

  private static String getZookeeperUrl(String host, int port) {
    return host + ":" + port;
  }

  private static void setupGlobalConfig(String zookeeperUrl) throws Exception {
    client = ConfigurationsUtils.getClient(zookeeperUrl);
    client.start();
    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.getBytes(), zookeeperUrl);
  }

  @AfterClass
  public static void teardown() throws Exception {
    HBaseUtil.INSTANCE.teardown(testUtil);
    file1.delete();
    file2.delete();
    multilineFile.delete();
    multilineGzFile.delete();
    multilineZipFile.delete();
    lineByLineExtractorConfigFile.delete();
    wholeFileExtractorConfigFile.delete();
    stellarExtractorConfigFile.delete();
    customLineByLineExtractorConfigFile.delete();
  }


  @Test
  public void testArgs() throws Exception {
    String[] argv = {"-c cf", "-t enrichment"
            , "-e extractor.json", "-n enrichment_config.json"
            , "-l log4j", "-i input.csv"
            , "-p 2", "-b 128", "-q"
    };

    String[] otherArgs = new GenericOptionsParser(config, argv).getRemainingArgs();

    CommandLine cli = LoadOptions.parse(new PosixParser(), otherArgs);
    Assert.assertEquals(extractorJson, LoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
    Assert.assertEquals(cf, LoadOptions.HBASE_CF.get(cli).trim());
    Assert.assertEquals(tableName, LoadOptions.HBASE_TABLE.get(cli).trim());
    Assert.assertEquals(enrichmentJson, LoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
    Assert.assertEquals(csvFile, LoadOptions.INPUT.get(cli).trim());
    Assert.assertEquals(log4jProperty, LoadOptions.LOG4J_PROPERTIES.get(cli).trim());
    Assert.assertEquals("2", LoadOptions.NUM_THREADS.get(cli).trim());
    Assert.assertEquals("128", LoadOptions.BATCH_SIZE.get(cli).trim());
  }

  @Test
  public void testLocalLineByLine() throws Exception {
    String[] argv = {"-c cf", "-t enrichment"
            , "-e " + lineByLineExtractorConfigFile.getPath()
            , "-i " + multilineFile.getPath()
            , "-p 2", "-b 128", "-q"
    };
    SimpleEnrichmentFlatFileLoader.main(config, argv);
    EnrichmentConverter converter = new EnrichmentConverter();
    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
    for (Result r : scanner) {
      results.add(converter.fromResult(r, cf));
      testTable.delete(new Delete(r.getRow()));
    }
    Assert.assertEquals(NUM_LINES, results.size());
    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
  }

  @Test
  public void testLocalLineByLine_gz() throws Exception {
    String[] argv = {"-c cf", "-t enrichment"
            , "-e " + lineByLineExtractorConfigFile.getPath()
            , "-i " + multilineGzFile.getPath()
            , "-p 2", "-b 128", "-q"
    };
    SimpleEnrichmentFlatFileLoader.main(config, argv);
    EnrichmentConverter converter = new EnrichmentConverter();
    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
    for (Result r : scanner) {
      results.add(converter.fromResult(r, cf));
      testTable.delete(new Delete(r.getRow()));
    }
    Assert.assertEquals(NUM_LINES, results.size());
    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));

  }

  @Test
  public void testLocalLineByLine_zip() throws Exception {
    String[] argv = {"-c cf", "-t enrichment"
            , "-e " + lineByLineExtractorConfigFile.getPath()
            , "-i " + multilineZipFile.getPath()
            , "-p 2", "-b 128", "-q"
    };
    SimpleEnrichmentFlatFileLoader.main(config, argv);
    EnrichmentConverter converter = new EnrichmentConverter();
    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
    for (Result r : scanner) {
      results.add(converter.fromResult(r, cf));
      testTable.delete(new Delete(r.getRow()));
    }
    Assert.assertEquals(NUM_LINES, results.size());
    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));

  }

  @Test
  public void testLocalWholeFile() throws Exception {
    String[] argv = { "-c cf", "-t enrichment"
            , "-e " + wholeFileExtractorConfigFile.getPath()
            , "-i " + file1.getPath() + "," + file2.getPath()
            , "-p 2", "-b 128", "-q"
    };
    SimpleEnrichmentFlatFileLoader.main(config, argv);
    EnrichmentConverter converter = new EnrichmentConverter();
    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
    for(Result r : scanner) {
      results.add(converter.fromResult(r, cf));
      testTable.delete(new Delete(r.getRow()));
    }
    Assert.assertEquals(2, results.size());
    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith( "google"));

  }

  @Test
  public void testMRLineByLine() throws Exception {
    String[] argv = {"-c cf", "-t enrichment"
            , "-e " + lineByLineExtractorConfigFile.getPath()
            , "-i " + multilineFile.getName()
            , "-m MR"
            , "-p 2", "-b 128", "-q"
    };
    FileSystem fs = FileSystem.get(config);
    HBaseUtil.INSTANCE.writeFile(new String(Files.readAllBytes(multilineFile.toPath())), new Path(multilineFile.getName()), fs);
    SimpleEnrichmentFlatFileLoader.main(config, argv);
    EnrichmentConverter converter = new EnrichmentConverter();
    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
    for (Result r : scanner) {
      results.add(converter.fromResult(r, cf));
      testTable.delete(new Delete(r.getRow()));
    }
    Assert.assertEquals(NUM_LINES, results.size());
    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
  }

  @Test
  public void stellar_transforms_and_filters_indicators_and_value_metadata() throws Exception {
    String[] argv = {"-c cf", "-t enrichment"
            , "-e " + stellarExtractorConfigFile.getPath()
            , "-i " + multilineFile.getPath()
            , "-p 2", "-b 128", "-q"
    };
    SimpleEnrichmentFlatFileLoader.main(config, argv);
    EnrichmentConverter converter = new EnrichmentConverter();
    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
    for (Result r : scanner) {
      results.add(converter.fromResult(r, cf));
      testTable.delete(new Delete(r.getRow()));
    }
    Assert.assertEquals(NUM_LINES, results.size());
    Assert.assertThat(results.get(0).getKey().getIndicator(), startsWith("GOOGLE"));
    Assert.assertThat(results.get(0).getKey().type, equalTo("enrichment"));
    Assert.assertThat(results.get(0).getValue().getMetadata().size(), equalTo(3));
    Assert.assertThat(results.get(0).getValue().getMetadata().get("meta").toString(), startsWith("foo"));
    Assert.assertThat(results.get(0).getValue().getMetadata().get("empty").toString(), startsWith("valfromglobalconfig"));
    Assert.assertThat(results.get(0).getValue().getMetadata().get("host").toString(), startsWith("GOOGLE"));
  }

  @Test
  public void custom_extractor_transforms_and_filters_indicators_and_value_metadata() throws Exception {
    String[] argv = {"-c cf", "-t enrichment"
            , "-e " + customLineByLineExtractorConfigFile.getPath()
            , "-i " + multilineFile.getPath()
            , "-p 2", "-b 128", "-q"
    };
    SimpleEnrichmentFlatFileLoader.main(config, argv);
    EnrichmentConverter converter = new EnrichmentConverter();
    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
    for (Result r : scanner) {
      results.add(converter.fromResult(r, cf));
      testTable.delete(new Delete(r.getRow()));
    }
    Assert.assertEquals(NUM_LINES, results.size());
    Assert.assertThat(results.get(0).getKey().getIndicator(), startsWith("GOOGLE"));
    Assert.assertThat(results.get(0).getKey().type, equalTo("enrichment"));
    Assert.assertThat(results.get(0).getValue().getMetadata().size(), equalTo(2));
    Assert.assertThat(results.get(0).getValue().getMetadata().get("meta").toString(), startsWith("foo"));
    Assert.assertThat(results.get(0).getValue().getMetadata().get("host").toString(), startsWith("GOOGLE"));
  }

}
