/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store.log;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.QueryRowSetIterator;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import com.fasterxml.jackson.databind.ObjectMapper;

import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@Category(RowSetTests.class)
public class TestLogReader extends ClusterTest {

  public static final String DATE_ONLY_PATTERN = "(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) .*";

  @ClassRule
  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();

  protected static File schemaAndConfigDir;
  protected static File schemaOnlyDir;

  private static File tableFuncDir;

  @BeforeClass
  public static void setup() throws Exception {
    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));

    // Define a regex format config for testing.
    defineRegexPlugin();
  }

  private static void defineRegexPlugin() throws ExecutionSetupException {

    // Create an instance of the regex config.
    // Note: we can't use the ".log" extension; the Drill .gitignore
    // file ignores such files, so they'll never get committed. Instead,
    // make up a fake suffix.
    Map<String, FormatPluginConfig> formats = new HashMap<>();
    LogFormatConfig sampleConfig = new LogFormatConfig();
    sampleConfig.setExtension("log1");
    sampleConfig.setRegex(DATE_ONLY_PATTERN);

    sampleConfig.initSchema();
    sampleConfig.getSchema().add(new LogFormatField("year", "INT"));
    sampleConfig.getSchema().add(new LogFormatField("month", "INT"));
    sampleConfig.getSchema().add(new LogFormatField("day", "INT"));
    formats.put("sample", sampleConfig);

    // Full Drill log parser definition.
    LogFormatConfig logConfig = new LogFormatConfig();
    logConfig.setExtension("log1");
    logConfig.setRegex("(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) " +
        "(\\d\\d):(\\d\\d):(\\d\\d),\\d+ " +
        "\\[([^]]*)] (\\w+)\\s+(\\S+) - (.*)");

    logConfig.initSchema();
    logConfig.getSchema().add(new LogFormatField("year", "INT"));
    logConfig.getSchema().add(new LogFormatField("month", "INT"));
    logConfig.getSchema().add(new LogFormatField("day", "INT"));
    logConfig.getSchema().add(new LogFormatField("hour", "INT"));
    logConfig.getSchema().add(new LogFormatField("minute", "INT"));
    logConfig.getSchema().add(new LogFormatField("second", "INT"));
    logConfig.getSchema().add(new LogFormatField("thread"));
    logConfig.getSchema().add(new LogFormatField("level"));
    logConfig.getSchema().add(new LogFormatField("module"));
    logConfig.getSchema().add(new LogFormatField("message"));
    formats.put("drill-log", logConfig);

    //Set up additional configs to check the time/date formats
    LogFormatConfig logDateConfig = new LogFormatConfig();
    logDateConfig.setExtension("log2");
    logDateConfig.setRegex("(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}),(\\d+)\\s\\[(\\w+)\\]\\s([A-Z]+)\\s(.+)");

    logDateConfig.initSchema();
    logDateConfig.getSchema().add(new LogFormatField("entry_date", "TIMESTAMP", "yy-MM-dd hh:mm:ss"));
    logDateConfig.getSchema().add(new LogFormatField("pid", "INT"));
    logDateConfig.getSchema().add(new LogFormatField("location"));
    logDateConfig.getSchema().add(new LogFormatField("message_type"));
    logDateConfig.getSchema().add(new LogFormatField("message"));

    logDateConfig.setMaxErrors(3);
    formats.put("date-log",logDateConfig);

    LogFormatConfig mysqlLogConfig = new LogFormatConfig();
    mysqlLogConfig.setExtension("sqllog");
    mysqlLogConfig.setRegex("(\\d{6})\\s(\\d{2}:\\d{2}:\\d{2})\\s+(\\d+)\\s(\\w+)\\s+(.+)");
    formats.put("mysql-log", mysqlLogConfig);

    // Firewall log file that requires date parsing
    LogFormatConfig firewallConfig = new LogFormatConfig();
    firewallConfig.setRegex("(\\w{3}\\s\\d{1,2}\\s\\d{4}\\s\\d{2}:\\d{2}:\\d{2})\\s+(\\w+)" +
            "\\[(\\d+)\\]:\\s(.*?(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}).*?)");
    firewallConfig.setExtension("ssdlog");
    firewallConfig.initSchema();
    firewallConfig.getSchema().add(new LogFormatField("eventDate", "TIMESTAMP", "MMM dd yyyy HH:mm:ss"));
    firewallConfig.getSchema().add(new LogFormatField("process_name"));
    firewallConfig.getSchema().add(new LogFormatField("pid", "INT"));
    firewallConfig.getSchema().add(new LogFormatField("message"));
    firewallConfig.getSchema().add(new LogFormatField("src_ip"));
    formats.put("ssdlog", firewallConfig);

    // Define a temporary format plugin for the "cp" storage plugin.
    cluster.defineFormats("cp", formats);

    // Config similar to the above, but with no type info. Types
    // will be provided via the provided schema mechanism. Column names
    // are required so that the format and provided schemas match up.
    LogFormatConfig untypedConfig = new LogFormatConfig();

    untypedConfig.setExtension("logu");
    untypedConfig.setRegex(DATE_ONLY_PATTERN);

    untypedConfig.initSchema();
    untypedConfig.getSchema().add(new LogFormatField("year"));
    untypedConfig.getSchema().add(new LogFormatField("month"));
    untypedConfig.getSchema().add(new LogFormatField("day"));

    // Create a test directory we can write to.
    schemaAndConfigDir = cluster.makeDataDir("sAndC", "logu", untypedConfig);

    // Empty configuration: regex and columns defined in the
    // provided schema
    LogFormatConfig emptyConfig = new LogFormatConfig();
    emptyConfig.setExtension("loge");
    schemaOnlyDir = cluster.makeDataDir("SOnly", "loge", emptyConfig);
    tableFuncDir = cluster.makeDataDir("tf", "logf", emptyConfig);
  }

  @Test
  public void testWildcard() throws RpcException {
    String sql = "SELECT * FROM cp.`regex/simple.log1`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("year", MinorType.INT)
        .addNullable("month", MinorType.INT)
        .addNullable("day", MinorType.INT)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow(2017, 12, 17)
        .addRow(2017, 12, 18)
        .addRow(2017, 12, 19)
        .build();

    RowSetUtilities.verify(expected, results);
  }

  /**
   * Tests for no crashes; does not validate results, unfortunately.
   */
  @Test
  public void testWildcardLargeFile() throws RpcException {
    String sql = "SELECT * FROM cp.`regex/large.log1`";
    QueryRowSetIterator iter = client.queryBuilder().sql(sql).rowSetIterator();

    for (RowSet rowSet : iter) {
      rowSet.clear();
    }
  }

  @Test
  public void testExplicitProject() throws RpcException {
    String sql = "SELECT `day`, `month` FROM cp.`regex/simple.log1`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("day", MinorType.INT)
        .addNullable("month", MinorType.INT)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow(17, 12)
        .addRow(18, 12)
        .addRow(19, 12)
        .build();

    RowSetUtilities.verify(expected, results);
  }

  @Test
  public void testMissingColumns() throws RpcException {
    String sql = "SELECT `day`, `missing`, `month` FROM cp.`regex/simple.log1`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("day", MinorType.INT)
        .addNullable("missing", MinorType.VARCHAR)
        .addNullable("month", MinorType.INT)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow(17, null, 12)
        .addRow(18, null, 12)
        .addRow(19, null, 12)
        .build();

    RowSetUtilities.verify(expected, results);
  }

  @Test
  public void testRaw() throws RpcException {
    String sql = "SELECT `_raw` FROM cp.`regex/simple.log1`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("_raw", MinorType.VARCHAR)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow("2017-12-17 10:52:41,820 [main] INFO  o.a.d.e.e.f.FunctionImplementationRegistry - Function registry loaded.  459 functions loaded in 1396 ms.")
        .addRow("2017-12-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.")
        .addRow("2017-12-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.")
        .build();
    RowSetUtilities.verify(expected, results);
  }

  @Test
  public void testDate() throws RpcException {
    String sql = "SELECT TYPEOF(`entry_date`) AS entry_date FROM cp.`regex/simple.log2` LIMIT 1";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .add("entry_date", MinorType.VARCHAR)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow("TIMESTAMP")
        .build();

    RowSetUtilities.verify(expected, results);
  }

  @Test
  public void testCount() throws RpcException {
    String sql = "SELECT COUNT(*) FROM cp.`regex/simple.log1`";
    long result = client.queryBuilder().sql(sql).singletonLong();
    assertEquals(3, result);
  }

  @Test
  public void testFull() throws RpcException {
    String sql = "SELECT * FROM cp.`regex/simple.log1`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("year", MinorType.INT)
        .addNullable("month", MinorType.INT)
        .addNullable("day", MinorType.INT)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow(2017, 12, 17)
        .addRow(2017, 12, 18)
        .addRow(2017, 12, 19)
        .build();

    RowSetUtilities.verify(expected, results);
  }

  /**
   * Test log queries without a defined schema using select *
   */
  @Test
  public void testStarQueryNoSchema() throws RpcException {
    String sql = "SELECT * FROM cp.`regex/mysql.sqllog`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addArray("columns", MinorType.VARCHAR)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addSingleCol(strArray("070823", "21:00:32", "1", "Connect", "root@localhost on test1"))
        .addSingleCol(strArray("070823", "21:00:48", "1", "Query", "show tables"))
        .addSingleCol(strArray("070823", "21:00:56", "1", "Query", "select * from category" ))
        .addSingleCol(strArray("070917", "16:29:01", "21", "Query","select * from location" ))
        .addSingleCol(strArray("070917", "16:29:12", "21", "Query","select * from location where id = 1 LIMIT 1" ))
        .build();

    RowSetUtilities.verify(expected, results);
  }

  @Test
  public void testSomeFieldsQueryNoSchema() throws RpcException {
    String sql = "SELECT columns[0], columns[4] FROM cp.`regex/mysql.sqllog`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("EXPR$0", MinorType.VARCHAR)
        .addNullable("EXPR$1", MinorType.VARCHAR)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow("070823", "root@localhost on test1")
        .addRow("070823",  "show tables")
        .addRow("070823",  "select * from category" )
        .addRow("070917",  "select * from location" )
        .addRow("070917", "select * from location where id = 1 LIMIT 1" )
        .build();

    RowSetUtilities.verify(expected, results);
  }

  @Test
  public void testRawNoSchema() throws RpcException {
    String sql = "SELECT _raw FROM cp.`regex/mysql.sqllog`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("_raw", MinorType.VARCHAR)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow("070823 21:00:32       1 Connect     root@localhost on test1")
        .addRow("070823 21:00:48       1 Query       show tables")
        .addRow("070823 21:00:56       1 Query       select * from category" )
        .addRow("070917 16:29:01      21 Query       select * from location" )
        .addRow("070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1" )
        .build();

    RowSetUtilities.verify(expected, results);
  }

  @Test
  public void testUMNoSchema() throws RpcException {
    String sql = "SELECT _unmatched_rows FROM cp.`regex/mysql.sqllog`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("_unmatched_rows", MinorType.VARCHAR)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow("dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl")
        .build();

    RowSetUtilities.verify(expected, results);
  }

  @Test
  public void testRawUMNoSchema() throws RpcException {
    String sql = "SELECT _raw, _unmatched_rows FROM cp.`regex/mysql.sqllog`";
    RowSet results = client.queryBuilder().sql(sql).rowSet();

    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("_raw", MinorType.VARCHAR)
        .addNullable("_unmatched_rows", MinorType.VARCHAR)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addRow("070823 21:00:32       1 Connect     root@localhost on test1", null)
        .addRow("070823 21:00:48       1 Query       show tables", null)
        .addRow("070823 21:00:56       1 Query       select * from category", null )
        .addRow("070917 16:29:01      21 Query       select * from location", null )
        .addRow("070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1", null )
        .addRow(null, "dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl")
        .build();

    RowSetUtilities.verify(expected, results);
  }

  /**
   * Build a table, temporary for this test, using the table name and resource
   * provided.
   *
   * @param tableName name of the table within the test-temporary dfs.data
   * workspace
   * @param fileName name of the one and only file in the table (allows using
   * plugin-specific extensions)
   * @param resource path to an existing resource file to copy into the
   * table as the given file name
   * @return the SQL path for the table
   * @throws IOException if the file operations fail
   */
  private String buildTable(File dir, String ws, String tableName,
      String fileName, String resource) throws IOException {

    // We need to create a schema file. Create a temporary test
    // table.
    File tableDir = new File(dir, tableName);
    tableDir.mkdirs();

    // Copy the "simple.log1" data file. Use a distinct extension
    // configured above to provide field names but no types.
    File dest = new File(tableDir, fileName);
    URL url = getClass().getResource(resource);
    FileUtils.copyURLToFile(url, dest);
    return "dfs." + ws + "." + tableName;
  }

  @Test
  public void testProvidedSchema() throws Exception {
    String tablePath = buildTable(schemaAndConfigDir, "sAndC", "schema",
        "sample.logu", "/regex/simple.log1");
    try {

      // Define the provided table schema
      client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
      String schemaSql = "create schema (`year` int not null, `month` int not null, " +
          "`day` int not null) " +
          "for table " + tablePath;
      run(schemaSql);

      // Run a query using the provided schema.
      String sql = "SELECT * FROM %s";
      RowSet results = client.queryBuilder().sql(sql, tablePath).rowSet();

      // Verify that the returned data used the schema.
      TupleMetadata expectedSchema = new SchemaBuilder()
          .add("year", MinorType.INT)
          .add("month", MinorType.INT)
          .add("day", MinorType.INT)
          .buildSchema();

      RowSet expected = client.rowSetBuilder(expectedSchema)
          .addRow(2017, 12, 17)
          .addRow(2017, 12, 18)
          .addRow(2017, 12, 19)
          .build();

      RowSetUtilities.verify(expected, results);
    } finally {
      client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
    }
  }

  /**
   * Test the case in which the plugin config contains no information
   * other than the file extensions. The regex is provided by the provided
   * schema, but no columns are defined, so we use the columns[] array.
   */
  @Test
  public void testSchemaOnlyNoCols() throws Exception {
    String tablePath = buildTable(schemaOnlyDir, "sOnly", "noCols", "sample.loge", "/regex/simple.log1");
    try {
      client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
      String schemaSql = "create schema ()" +
          " for table %s properties ('%s'='%s')";
      run(schemaSql, tablePath, LogFormatPlugin.REGEX_PROP, DATE_ONLY_PATTERN);

      String sql = "SELECT * FROM %s";
      RowSet results = client.queryBuilder().sql(sql, tablePath).rowSet();

      TupleMetadata expectedSchema = new SchemaBuilder()
          .addArray("columns", MinorType.VARCHAR)
          .buildSchema();

      RowSet expected = client.rowSetBuilder(expectedSchema)
          .addSingleCol(strArray("2017", "12", "17"))
          .addSingleCol(strArray("2017", "12", "18"))
          .addSingleCol(strArray("2017", "12", "19"))
          .build();

      RowSetUtilities.verify(expected, results);
    } finally {
      client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
    }
  }

  /**
   * Test the case in which the plugin config contains no information
   * other than the file extensions. The provided schema includes both
   * the regex and the set of columns and types.
   */
  @Test
  public void testSchemaOnlyWithCols() throws Exception {
    String tablePath = buildTable(schemaOnlyDir, "sOnly", "withCols", "sample.loge", "/regex/simple.log1");
    try {
      client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
      String schemaSql = "create schema (`year` int not null, `month` int not null, " +
          "`day` int not null) " +
          " for table " + tablePath +
          " properties ('" + LogFormatPlugin.REGEX_PROP +
          "'='" + DATE_ONLY_PATTERN + "')";
      run(schemaSql);

      String sql = "SELECT * FROM %s";
      RowSet results = client.queryBuilder().sql(sql, tablePath).rowSet();

      TupleMetadata expectedSchema = new SchemaBuilder()
          .add("year", MinorType.INT)
          .add("month", MinorType.INT)
          .add("day", MinorType.INT)
          .buildSchema();

      RowSet expected = client.rowSetBuilder(expectedSchema)
          .addRow(2017, 12, 17)
          .addRow(2017, 12, 18)
          .addRow(2017, 12, 19)
          .build();

      RowSetUtilities.verify(expected, results);
    } finally {
      client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
    }
  }

  /**
   * Corner case: provided schema has the regex and two of the three
   * columns, the third takes a default name and type.
   */
  @Test
  public void testSchemaOnlyWithMissingCols() throws Exception {
    String tablePath = buildTable(schemaOnlyDir, "sOnly", "missingCols", "sample.loge", "/regex/simple.log1");
    try {
      client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
      String schemaSql = "create schema (`year` int not null, `month` int not null) " +
          " for table " + tablePath +
          " properties ('" + LogFormatPlugin.REGEX_PROP +
          "'='" + DATE_ONLY_PATTERN + "')";
      run(schemaSql);

      String sql = "SELECT * FROM %s";
      RowSet results = client.queryBuilder().sql(sql, tablePath).rowSet();

      TupleMetadata expectedSchema = new SchemaBuilder()
          .add("year", MinorType.INT)
          .add("month", MinorType.INT)
          .addNullable("field_2", MinorType.VARCHAR)
          .buildSchema();

      RowSet expected = client.rowSetBuilder(expectedSchema)
          .addRow(2017, 12, "17")
          .addRow(2017, 12, "18")
          .addRow(2017, 12, "19")
          .build();

      RowSetUtilities.verify(expected, results);
    } finally {
      client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
    }
  }

  /**
   * Verify that an error is thrown if no pattern is provided in
   * the plugin config, table function or provided schema.
   */
  @Test
  public void testEmptyPattern() throws Exception {
    String tablePath = buildTable(tableFuncDir, "tf", "emptyRegex",
        "sample.logf", "/regex/simple.log1");
    try {
     String sql = "SELECT * FROM %s";
     client.queryBuilder().sql(sql, tablePath).run();
     fail();
    } catch (Exception e) {
      assertTrue(e.getMessage().contains("Regex property is required"));
    }
  }

  /**
   * Test the ability to use table functions to specify the regex.
   */
  @Test
  public void testTableFunction() throws Exception {
    String tablePath = buildTable(tableFuncDir, "tf", "table1",
        "sample.logf", "/regex/simple.log1");

    // Run a query using a table function.
    String escaped = DATE_ONLY_PATTERN.replace("\\", "\\\\");
    String sql = "SELECT * FROM table(%s(type => '%s', regex => '%s', maxErrors => 10))";
    RowSet results = client.queryBuilder().sql(sql, tablePath,
        LogFormatPlugin.PLUGIN_NAME, escaped).rowSet();

    // Verify that the returned data used the schema.
    TupleMetadata expectedSchema = new SchemaBuilder()
        .addArray("columns", MinorType.VARCHAR)
        .buildSchema();

    RowSet expected = client.rowSetBuilder(expectedSchema)
        .addSingleCol(strArray("2017", "12", "17"))
        .addSingleCol(strArray("2017", "12", "18"))
        .addSingleCol(strArray("2017", "12", "19"))
        .build();

    RowSetUtilities.verify(expected, results);
  }

  /**
   * Test the use of a table function to provide the regex. Verify
   * that the plugin throws an error if no groups are defined.
   */
  @Test
  public void testTableFunctionNoGroups() throws Exception {
    String tablePath = buildTable(tableFuncDir, "tf", "noGroups",
        "sample.logf", "/regex/simple.log1");

    // Use a table function to pass in a regex without a group.

    try {
      String sql = "SELECT * FROM table(%s(type => '%s', regex => '''foo'''))";
      client.queryBuilder().sql(sql, tablePath, LogFormatPlugin.PLUGIN_NAME).run();
      fail();
    } catch (Exception e) {
      assertTrue(e.getMessage().contains("Regex property has no groups"));
    }
  }

  /**
   * Test the use of the schema table function to provide a schema
   * including types. In this form, the regex must be provided by the
   * plugin config or (as in this test), as table properties.
   */
  @Test
  public void testTableFunctionWithSchema() throws Exception {
    String tablePath = buildTable(tableFuncDir, "tf", "table2",
        "sample.logf", "/regex/simple.log1");
    try {
      client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);

      // Run a query using a table function.

      String escaped = DATE_ONLY_PATTERN.replace("\\", "\\\\");
      String sql = "SELECT * FROM table(%s(" +
         "schema=>'inline=(`year` int, `month` int, `day` int) properties {`%s`=`%s`}'))";
      RowSet results = client.queryBuilder().sql(sql, tablePath,
         LogFormatPlugin.REGEX_PROP, escaped).rowSet();

      // Verify that the returned data used the schema.
      TupleMetadata expectedSchema = new SchemaBuilder()
          .addNullable("year", MinorType.INT)
          .addNullable("month", MinorType.INT)
          .addNullable("day", MinorType.INT)
          .buildSchema();

      RowSet expected = client.rowSetBuilder(expectedSchema)
          .addRow(2017, 12, 17)
          .addRow(2017, 12, 18)
          .addRow(2017, 12, 19)
          .build();

      RowSetUtilities.verify(expected, results);
    } finally {
      client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
    }
  }

  /**
   * Test that a schema can be provided in a table function that also includes
   * plugin config. This case fails for the log format plugin because, unfortunately,
   * the log format plugin has a config field called "schema" which is not a string
   * and is found by the code before trying to treat "schema" as a schema.
   * So, this test is disabled.
   */
  @Test
  @Ignore("Use of schema conflicts with plugin field")
  public void testTableFunctionWithConfigAndSchema() throws Exception {
    String tablePath = buildTable(tableFuncDir, "tf", "table2",
        "sample.logf", "/regex/simple.log1");
    try {
      client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);

      // Run a query using a table function.
      String escaped = DATE_ONLY_PATTERN.replace("\\", "\\\\");
      String sql = "SELECT * FROM table(%s(type => '%s', regex => '%s', " +
         "schema=>'inline=(`year` int, `month` int, `day` int)'))";
      RowSet results = client.queryBuilder().sql(sql, tablePath,
          LogFormatPlugin.PLUGIN_NAME, escaped).rowSet();

      // Verify that the returned data used the schema.
      TupleMetadata expectedSchema = new SchemaBuilder()
          .addNullable("year", MinorType.INT)
          .addNullable("month", MinorType.INT)
          .addNullable("day", MinorType.INT)
          .buildSchema();

      RowSet expected = client.rowSetBuilder(expectedSchema)
          .addRow(2017, 12, 17)
          .addRow(2017, 12, 18)
          .addRow(2017, 12, 19)
          .build();

      RowSetUtilities.verify(expected, results);
    } finally {
      client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
    }
  }

  /**
   * The config classes for this plugin have been trick to get right.
   * The following ensures that the plugin config works correctly
   * to serialize/deserialize values.
   */
  @Test
  public void testPluginSerialization() throws IOException {
    ObjectMapper mapper = new ObjectMapper();
    assertTrue(mapper.canSerialize(LogFormatPlugin.class));

    LogFormatConfig sampleConfig = new LogFormatConfig();
    sampleConfig.setExtension("log1");
    sampleConfig.setRegex(DATE_ONLY_PATTERN);

    sampleConfig.initSchema();
    sampleConfig.getSchema().add(new LogFormatField("year", "INT"));
    sampleConfig.getSchema().add(new LogFormatField("month", "INT"));
    sampleConfig.getSchema().add(new LogFormatField("day", "INT"));
    String json = mapper.writeValueAsString(sampleConfig);
    LogFormatConfig result = mapper.readValue(json, LogFormatConfig.class);

    assertEquals(sampleConfig.getRegex(), result.getRegex());
    assertEquals(sampleConfig.getMaxErrors(), result.getMaxErrors());
    assertEquals(sampleConfig.getSchema(), result.getSchema());
  }

  @Test
  public void testFirewallSchema() throws RpcException {
    String sql = "SELECT * FROM cp.`regex/firewall.ssdlog` limit 0";
    RowSet result = client.queryBuilder().sql(sql).rowSet();
    TupleMetadata expectedSchema = new SchemaBuilder()
        .addNullable("eventDate", MinorType.TIMESTAMP)
        .addNullable("process_name", MinorType.VARCHAR)
        .addNullable("pid", MinorType.INT)
        .addNullable("message", MinorType.VARCHAR)
        .addNullable("src_ip", MinorType.VARCHAR)
        .buildSchema();

    RowSet.SingleRowSet expected = client.rowSetBuilder(expectedSchema).build();

    RowSetUtilities.verify(expected, result);
    result.clear();
  }
}
