/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.pig.builtin;

import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Iterator;

import com.google.common.io.Closeables;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.tool.DataFileWriteTool;
import org.apache.avro.tool.Tool;
import org.apache.avro.tool.TrevniCreateRandomTool;
import org.apache.avro.tool.TrevniToJsonTool;
import org.apache.avro.util.Utf8;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.avro.AvroBagWrapper;
import org.apache.pig.impl.util.avro.AvroMapWrapper;
import org.apache.pig.impl.util.avro.AvroTupleWrapper;
import org.apache.pig.test.Util;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.google.common.collect.ImmutableMap;

public class TestAvroStorage {

    final protected static Log LOG = LogFactory.getLog(TestAvroStorage.class);

    final private static String basedir = "test/org/apache/pig/builtin/avro/";
    private static String outbasedir = System.getProperty("user.dir") + "/build/test/TestAvroStorage/";
    final private static String[] datadir = {
        "data/avro",
        "data/avro/compressed",
        "data/avro/compressed/snappy",
        "data/avro/compressed/deflate",
        "data/avro/uncompressed",
        "data/avro/uncompressed/testdirectory",
        "data/json/testdirectory",
        "data/trevni",
        "data/trevni/uncompressed",
    };
    final private static String[] avroSchemas = {
        "arraysAsOutputByPig",
        "arrays",
        "recordsAsOutputByPig",
        "recordsAsOutputByPigWithDates",
        "records",
        "recordsOfArrays",
        "recordsOfStringArrays",
        "recordsOfArraysOfRecords",
        "recordsSubSchema",
        "recordsSubSchemaNullable",
        "recordsWithDoubleUnderscores",
        "recordsWithEnums",
        "recordsWithFixed",
        "recordsWithMaps",
        "recordsWithMapsOfRecords",
        "recordsWithMapsOfArrayOfRecords",
        "recordsWithNullableUnions",
        "recordWithRepeatedSubRecords",
        "recursiveRecord",
        "projectionTest",
        "projectionTestWithSchema",
        "recordsWithSimpleUnion",
        "recordsWithSimpleUnionOutput",
    };
    final private static String[] trevniSchemas = {
        "simpleRecordsTrevni",
    };

    private static PigServer pigServerLocal = null;

    public static final PathFilter hiddenPathFilter = new PathFilter() {
        @Override
        public boolean accept(Path p) {
          String name = p.getName();
          return !name.startsWith("_") && !name.startsWith(".");
        }
      };

    private static String loadFileIntoString(String file) throws IOException {
      return FileUtils.readFileToString(new File(file)).replace("\n", " ");
    }

    @BeforeClass
    public static void setup() throws Exception {
        pigServerLocal = new PigServer(Util.getLocalTestMode());
        Util.deleteDirectory(new File(outbasedir));
        generateInputFiles();
    }

    @AfterClass
    public static void teardown() throws IOException {
        if(pigServerLocal != null) {
            pigServerLocal.shutdown();
        }
        deleteInputFiles();
    }

    /**
     * Generate input files for test cases
     */
    private static void generateInputFiles() throws IOException {
        String data;
        String json;
        String avro;
        String trevni;
        String schema;

        for (String fn : datadir) {
            FileUtils.forceMkdir(new File(basedir + fn));
        }

        for (String fn : avroSchemas) {
            schema = basedir + "schema/" + fn + ".avsc";
            json = basedir + "data/json/" + fn + ".json";
            avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
            LOG.info("creating " + avro);
            generateAvroFile(schema, json, avro, null);
        }

        for (String fn : trevniSchemas) {
            schema = basedir + "schema/" + fn + ".avsc";
            trevni = basedir + "data/trevni/uncompressed/" + fn + ".trevni";
            json = basedir + "data/json/" + fn + ".json";
            LOG.info("creating " + trevni);
            generateRandomTrevniFile(schema, "1000", trevni);
            LOG.info("creating " + json);
            convertTrevniToJsonFile(schema, trevni, json);

            schema = basedir + "schema/" + fn + ".avsc";
            json = basedir + "data/json/" + fn + ".json";
            avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
            LOG.info("creating " + avro);
            generateAvroFile(schema, json, avro, null);
        }

        PrintWriter w;
        int itemSum = 0;
        int recordCount = 0;
        int evenFileNameItemSum = 0;
        int evenFileNameRecordCount = 0;

        for (int i = 0; i < 8; i++) {
            schema = basedir + "schema/testDirectory.avsc";
            json = basedir + "data/json/testdirectory/part-m-0000" + i;
            avro = basedir + "data/avro/uncompressed/testdirectory/part-m-0000" + i + ".avro";
            LOG.info("creating " + json);
            w = new PrintWriter(new FileWriter(json));
            for (int j = i*1000; j < (i+1)*1000; j++) {
                itemSum += j;
                recordCount += 1;
                evenFileNameItemSum += ((i+1) % 2) * j;
                evenFileNameRecordCount += (i+1) % 2;
                w.println("{\"item\" : " + j + ", \"timestamp\" : " + System.currentTimeMillis() +  " }\n");
            }
            w.close();
            generateAvroFile(schema, json, avro, null);
        }

        schema = basedir + "schema/testDirectoryCounts.avsc";
        json = basedir + "data/json/testDirectoryCounts.json";
        avro = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
        data = "{\"itemSum\" : {\"int\" : " + itemSum + "}, \"n\" : {\"int\" : " + recordCount + "} }";
        LOG.info("creating " + json);
        FileUtils.writeStringToFile(new File(json), data);
        LOG.info("creating " + avro);
        generateAvroFile(schema, json, avro, null);

        schema = basedir + "schema/testDirectoryCounts.avsc";
        json = basedir + "data/json/evenFileNameTestDirectoryCounts.json";
        avro = basedir + "data/avro/uncompressed/evenFileNameTestDirectoryCounts.avro";
        data = "{\"itemSum\" : {\"int\" : " + evenFileNameItemSum + "}, \"n\" : {\"int\" : " + evenFileNameRecordCount + "} }";
        LOG.info("creating " + json);
        FileUtils.writeStringToFile(new File(json), data);
        LOG.info("creating " + avro);
        generateAvroFile(schema, json, avro, null);

        for (String codec : new String[] {"deflate", "snappy"}) {
            for (String fn : new String[] {"records",  "recordsAsOutputByPig"}) {
                schema = basedir + "schema/" + fn + ".avsc";
                json = basedir + "data/json/" + fn + ".json";
                avro = basedir + "data/avro/compressed/" + codec + "/" + fn + ".avro";
                LOG.info("creating " + avro);
                generateAvroFile(schema, json, avro, codec);
            }
        }
    }

    /**
     * Clean up generated input files
     */
    private static void deleteInputFiles() throws IOException {
        // Delete auto-generated directories
        for (String fn : datadir) {
            LOG.info("Deleting " + basedir + fn);
            FileUtils.deleteQuietly(new File(basedir + fn));
        }
        // Delete auto-generated json files
        String json;
        for (String fn : trevniSchemas) {
            json = basedir + "data/json/" + fn + ".json";
            LOG.info("Deleting " + json);
            FileUtils.deleteQuietly(new File(json));
        }
        json = basedir + "data/json/testDirectoryCounts.json";
        LOG.info("Deleting " + json);
        FileUtils.deleteQuietly(new File(json));
        json = basedir + "data/json/evenFileNameTestDirectoryCounts.json";
        LOG.info("Deleting " + json);
        FileUtils.deleteQuietly(new File(json));
    }

    private static void generateAvroFile(String schema, String json, String avro, String codec) throws IOException {
        Tool tool = new DataFileWriteTool();
        List<String> args = new ArrayList<String>();
        args.add("--schema-file");
        args.add(schema);
        args.add(json);
        if (codec != null) {
            args.add("--codec");
            args.add(codec);
        }
        try {
            StringBuffer sb = new StringBuffer();
            for (String a : args) {
                sb.append(a);
                sb.append(" ");
            }
            PrintStream out = new PrintStream(avro);
            tool.run(System.in, out, System.err, args);
        } catch (Exception e) {
            LOG.info("Could not generate avro file: " + avro, e);
            throw new IOException();
        }
    }

    private static void generateRandomTrevniFile(String schema, String count, String trevni) throws IOException {
        Tool tool = new TrevniCreateRandomTool();
        List<String> args = new ArrayList<String>();
        args.add(schema);
        args.add(count);
        args.add(trevni);
        try {
            tool.run(System.in, System.out, System.err, args);
        } catch (Exception e) {
            LOG.info("Could not generate trevni file: " + trevni, e);
            throw new IOException();
        }
    }

    private static void convertTrevniToJsonFile(String schema, String trevni, String json) throws IOException {
        Tool tool = new TrevniToJsonTool();
        List<String> args = new ArrayList<String>();
        args.add(trevni);
        try {
            PrintStream out = new PrintStream(json);
            tool.run(System.in, out, System.err, args);
        } catch (Exception e) {
            LOG.info("Could not generate json file: " + json, e);
            throw new IOException();
        }
    }

    private String createOutputName() {
        final StackTraceElement[] st = Thread.currentThread().getStackTrace();
        if(Util.WINDOWS){
            outbasedir = outbasedir.replace('\\','/');
        }

        int index;
        if (Utils.isVendorIBM()) {
            index = 3;
        } else {
            index = 2;
        }

        return outbasedir + st[index].getMethodName();
    }

    @Test
    public void testLoadRecordsOfStringArrays() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordsOfStringArrays.avro";
      testAvroStorage(true, basedir + "code/pig/dump.pig",
          ImmutableMap.of(
              "INFILE",             input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsOfStringArrays.avsc",
              "OUTFILE",            createOutputName())
        );
    }


    @Test
    public void testLoadRecords() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
      testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "records",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsWithSimpleUnion() throws Exception {
        final String input = basedir + "data/avro/uncompressed/recordsWithSimpleUnion.avro";
        final String check = basedir + "data/avro/uncompressed/recordsWithSimpleUnionOutput.avro";
        testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
            ImmutableMap.of(
                 "INFILE",            input,
                 "AVROSTORAGE_OUT_1", "records",
                 "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin -f " + basedir + "schema/recordsSubSchema.avsc",
                 "OUTFILE",           createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testProjection() throws Exception {
        final String input = basedir + "data/avro/uncompressed/records.avro";
        final String check = basedir + "data/avro/uncompressed/projectionTest.avro";
        testAvroStorage(true, basedir + "code/pig/projection_test.pig",
            ImmutableMap.of(
                "INFILE",           input,
                "AVROSTORAGE_OUT_1", "projectionTest",
                "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
                "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testProjectionWithSchema() throws Exception {
        final String input = basedir + "data/avro/uncompressed/records.avro";
        final String check = basedir + "data/avro/uncompressed/projectionTestWithSchema.avro";
        testAvroStorage(true, basedir + "code/pig/projection_test_with_schema.pig",
                ImmutableMap.of(
                        "INFILE",           input,
                        "AVROSTORAGE_IN_2",  "-f " + basedir + "schema/records.avsc",
                        "AVROSTORAGE_OUT_1", "projectionTest",
                        "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
                        "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testDates() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPigWithDates.avro";
      testAvroStorage(true, basedir + "code/pig/with_dates.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "records",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsSpecifyFullSchema() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
      final String schema = loadFileIntoString(basedir + "schema/records.avsc");
      testAvroStorage(true, basedir + "code/pig/identity_ai1_ao2.pig",
          ImmutableMap.of(
               "INFILE",            input,
               "OUTFILE",           createOutputName(),
               "AVROSTORAGE_OUT_1", "records",
               "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
               "AVROSTORAGE_IN_1",  schema)
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsSpecifyFullSchemaFromClass() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
      testAvroStorage(true, basedir + "code/pig/identity.pig",
          ImmutableMap.of(
               "INFILE",            input,
               "OUTFILE",           createOutputName(),
               "AVROSTORAGE_IN_2",  "-c org.apache.pig.builtin.avro.code.java.RecordPojo",
               "AVROSTORAGE_OUT_1", "''",
               "AVROSTORAGE_OUT_2", "-c org.apache.pig.builtin.avro.code.java.RecordPojo")
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsSpecifyFullSchemaFromFile() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
      testAvroStorage(true, basedir + "code/pig/identity.pig",
          ImmutableMap.of(
               "INFILE",            input,
               "OUTFILE",           createOutputName(),
               "AVROSTORAGE_OUT_1", "records",
               "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
               "AVROSTORAGE_IN_2",  "-f " + basedir + "schema/records.avsc")
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsSpecifySubSchema() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsSubSchema.avro";
      testAvroStorage(true, basedir + "code/pig/identity_ai1_ao2.pig",
          ImmutableMap.of(
               "INFILE",            input,
               "OUTFILE",           createOutputName(),
               "AVROSTORAGE_OUT_1", "records",
               "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin -f " + basedir + "schema/recordsSubSchema.avsc",
               "AVROSTORAGE_IN_1",  loadFileIntoString(basedir + "schema/recordsSubSchema.avsc"))
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsSpecifySubSchemaFromFile() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsSubSchema.avro";
      testAvroStorage(true, basedir + "code/pig/identity_blank_first_args.pig",
          ImmutableMap.of(
               "INFILE",            input,
               "OUTFILE",           createOutputName(),
               "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsSubSchema.avsc",
               "AVROSTORAGE_IN_2",  "-f " + basedir + "schema/recordsSubSchema.avsc")
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsSpecifySubSchemaFromExampleFile() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsSubSchema.avro";
      testAvroStorage(true, basedir + "code/pig/identity_blank_first_args.pig",
          ImmutableMap.of(
               "INFILE",            input,
               "OUTFILE",           createOutputName(),
               "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsSubSchema.avsc",
               "AVROSTORAGE_IN_2",  "-e " + basedir + "data/avro/uncompressed/recordsSubSchema.avro")
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsOfArrays() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordsOfArrays.avro";
      final String check = input;
      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
          ImmutableMap.of(
              "INFILE",             input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsOfArrays.avsc",
              "OUTFILE",            createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsOfArraysOfRecords() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordsOfArraysOfRecords.avro";
      final String check = input;
      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
          ImmutableMap.of(
              "INFILE",             input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsOfArraysOfRecords.avsc",
              "OUTFILE",            createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsWithEnums() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordsWithEnums.avro";
      final String check = input;
      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
          ImmutableMap.of(
              "INFILE",             input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithEnums.avsc",
              "OUTFILE",            createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsWithFixed() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordsWithFixed.avro";
      final String check = input;
      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
          ImmutableMap.of(
              "INFILE",             input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithFixed.avsc",
              "OUTFILE",            createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsWithMaps() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordsWithMaps.avro";
      final String check = input;
      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
          ImmutableMap.of(
              "INFILE",             input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithMaps.avsc",
              "OUTFILE",            createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsWithMapsOfRecords() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordsWithMapsOfRecords.avro";
      final String check = input;
      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
          ImmutableMap.of(
              "INFILE",             input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithMapsOfRecords.avsc",
              "OUTFILE",            createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsWithMapsOfArrayOfRecords() throws Exception {
        final String input = basedir + "data/avro/uncompressed/recordsWithMapsOfArrayOfRecords.avro";
        final String check = input;
        testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
            ImmutableMap.of(
                "INFILE",             input,
                "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithMapsOfArrayOfRecords.avsc",
                "OUTFILE",            createOutputName())
          );
        verifyResults(createOutputName(),check);
      }

    @Test
    public void testLoadRecordsWithNullableUnions() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordsWithNullableUnions.avro";
      final String check = input;
      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
          ImmutableMap.of(
              "INFILE",             input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithNullableUnions.avsc",
              "OUTFILE",            createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadDeflateCompressedRecords() throws Exception {
      final String input = basedir + "data/avro/compressed/deflate/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
      testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "records",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadSnappyCompressedRecords() throws Exception {
      final String input = basedir + "data/avro/compressed/snappy/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
      testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "records",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testStoreDeflateCompressedRecords() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/compressed/deflate/recordsAsOutputByPig.avro";
      testAvroStorage(true, basedir + "code/pig/identity_codec.pig",
          ImmutableMap.<String,String>builder()
            .put("INFILE",input)
            .put("OUTFILE",createOutputName())
            .put("AVROSTORAGE_OUT_1", "records")
            .put("AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin")
            .put("CODEC", "deflate")
            .put("LEVEL", "6")
            .build()
           );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testStoreSnappyCompressedRecords() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/compressed/snappy/recordsAsOutputByPig.avro";
      testAvroStorage(true, basedir + "code/pig/identity_codec.pig",
          ImmutableMap.<String,String>builder()
          .put("INFILE",input)
          .put("OUTFILE",createOutputName())
          .put("AVROSTORAGE_OUT_1", "records")
          .put("AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin")
          .put("CODEC", "snappy")
          .put("LEVEL", "6")
          .build()
         );
     verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecursiveRecords() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recursiveRecord.avro";
      testAvroStorage(false, basedir + "code/pig/recursive_tests.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_IN_2", "-n this.is.ignored.in.a.loadFunc",
              "AVROSTORAGE_OUT_1", "recordsSubSchemaNullable",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
    }

    @Test
    public void testLoadRecursiveRecordsOptionOn() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recursiveRecord.avro";
      final String check = basedir + "data/avro/uncompressed/recordsSubSchemaNullable.avro";
      testAvroStorage(true, basedir + "code/pig/recursive_tests.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_IN_2", "-r",
              "AVROSTORAGE_OUT_1", "recordsSubSchemaNullable",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadRecordsWithRepeatedSubRecords() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
      final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testGroupWithRepeatedSubRecords() throws Exception {
      final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
      final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
      testAvroStorage(true, basedir + "code/pig/group_test.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadDirectory() throws Exception {
      final String input = basedir + "data/avro/uncompressed/testdirectory";
      final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
      testAvroStorage(true, basedir + "code/pig/directory_test.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "stats",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadGlob() throws Exception {
      final String input = basedir + "data/avro/uncompressed/testdirectory/part-m-0000*";
      final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
      testAvroStorage(true, basedir + "code/pig/directory_test.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "stats",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testPartialLoadGlob() throws Exception {
      final String input = basedir + "data/avro/uncompressed/testdirectory/part-m-0000{0,2,4,6}.avro";
      final String check = basedir + "data/avro/uncompressed/evenFileNameTestDirectoryCounts.avro";
      testAvroStorage(true, basedir + "code/pig/directory_test.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "stats",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testSeparatedByComma() throws Exception {
        final String temp = basedir
                + "data/avro/uncompressed/testdirectory/part-m-0000";
        StringBuffer sb = new StringBuffer();
        sb.append(temp + "0.avro");
        for (int i = 1; i <= 7; ++i) {
            sb.append(",");
            sb.append(temp + String.valueOf(i) + ".avro");
        }
        final String input = sb.toString();
        final String check = basedir
                + "data/avro/uncompressed/testDirectoryCounts.avro";
        testAvroStorage(true, basedir + "code/pig/directory_test.pig",
                ImmutableMap.of("INFILE", input, "AVROSTORAGE_OUT_1", "stats",
                        "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
                        "OUTFILE", createOutputName()));
        verifyResults(createOutputName(), check);
    }

    @Test
    public void testDoubleUnderscore() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      final String check = basedir + "data/avro/uncompressed/recordsWithDoubleUnderscores.avro";
      testAvroStorage(true, basedir + "code/pig/namesWithDoubleColons.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "recordsWithDoubleUnderscores",
              "AVROSTORAGE_OUT_2", "-d -n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testDoubleUnderscoreNoFlag() throws Exception {
      final String input = basedir + "data/avro/uncompressed/records.avro";
      testAvroStorage(false, basedir + "code/pig/namesWithDoubleColons.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "recordsWithDoubleUnderscores",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
    }

    @Test
    public void testLoadArrays() throws Exception {
      final String input = basedir + "data/avro/uncompressed/arrays.avro";
      final String check = basedir + "data/avro/uncompressed/arraysAsOutputByPig.avro";
      testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_1", "arrays",
              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadTrevniRecords() throws Exception {
      final String input = basedir + "data/trevni/uncompressed/simpleRecordsTrevni.trevni";
      final String check = basedir + "data/avro/uncompressed/simpleRecordsTrevni.avro";
      testAvroStorage(true, basedir + "code/pig/trevni_to_avro.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/simpleRecordsTrevni.avsc",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(),check);
    }

    @Test
    public void testLoadAndSaveTrevniRecords() throws Exception {
      final String input = basedir + "data/trevni/uncompressed/simpleRecordsTrevni.trevni";
      final String check = basedir + "data/avro/uncompressed/simpleRecordsTrevni.avro";

      testAvroStorage(true, basedir + "code/pig/trevni_to_trevni.pig",
          ImmutableMap.of(
              "INFILE",           input,
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/simpleRecordsTrevni.avsc",
              "OUTFILE",          createOutputName() + "Trevni")
        );

      testAvroStorage(true, basedir + "code/pig/trevni_to_avro.pig",
          ImmutableMap.of(
              "INFILE",           createOutputName() + "Trevni",
              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/simpleRecordsTrevni.avsc",
              "OUTFILE",          createOutputName())
        );
      verifyResults(createOutputName(), check);
    }

    @Test
    public void testRetrieveDataFromMap() throws Exception {
        pigServerLocal = new PigServer(Util.getLocalTestMode());
        Data data = resetData(pigServerLocal);
        Map<String, String> mapv1 = new HashMap<String, String>();
        mapv1.put("key1", "v11");
        mapv1.put("key2", "v12");
        Map<String, String> mapv2 = new HashMap<String, String>();
        mapv2.put("key1", "v21");
        mapv2.put("key2", "v22");
        data.set("testMap", "maps:map[chararray]", tuple(mapv1), tuple(mapv2));
        String schemaDescription = new String(
                "{" +
                      "\"type\": \"record\"," +
                      "\"name\": \"record\"," +
                      "\"fields\" : [" +
                      "{\"name\" : \"maps\", \"type\" :{\"type\" : \"map\", \"values\" : \"string\"}}" +
                      "]" +
                      "}");
        pigServerLocal.registerQuery("A = LOAD 'testMap' USING mock.Storage();");
        pigServerLocal.registerQuery("STORE A INTO '" + createOutputName() + "' USING AvroStorage('"+ schemaDescription +"');");
        pigServerLocal.registerQuery("B = LOAD '" + createOutputName() + "' USING AvroStorage();");
        pigServerLocal.registerQuery("C = FOREACH B generate maps#'key1';");
        pigServerLocal.registerQuery("STORE C INTO 'out' USING mock.Storage();");

        List<Tuple> out = data.get("out");
        assertEquals(tuple("v11"), out.get(0));
        assertEquals(tuple("v21"), out.get(1));
    }

    @SuppressWarnings("rawtypes")
    @Test
    public void testCompareToOfBagWrapper() throws Exception {
        final String check = basedir + "data/avro/uncompressed/arraysAsOutputByPig.avro";

        Set<GenericData.Record> records = getExpected(check);
        assertEquals(3, records.size());

        AvroBagWrapper size0 = null; // [ ]
        AvroBagWrapper size1 = null; // [ 6 ]
        AvroBagWrapper size5 = null; // [ 1, 2, 3, 4, 5 ]

        // 3 arrays in records are in an arbitrary order. We re-order them by
        // their size.
        for (GenericData.Record record : records) {
            AvroTupleWrapper tw = new AvroTupleWrapper<GenericData.Record>(record);
            if (((AvroBagWrapper)tw.get(0)).size() == 0) {
                size0 = (AvroBagWrapper)tw.get(0);
            } else if (((AvroBagWrapper)tw.get(0)).size() == 1) {
                size1 = (AvroBagWrapper)tw.get(0);
            } else if (((AvroBagWrapper)tw.get(0)).size() == 5) {
                size5 = (AvroBagWrapper)tw.get(0);
            }
        }

        assertEquals(0, size0.size());
        assertEquals(1, size1.size());
        assertEquals(5, size5.size());

        assertTrue(size0.compareTo(size0) == 0);
        assertTrue(size0.compareTo(size1) < 0);
        assertTrue(size0.compareTo(size5) < 0);
        assertTrue(size1.compareTo(size0) > 0);
        // 6 > 1, so size1 > size5 even though size1 is smaller than size5.
        assertTrue(size1.compareTo(size5) > 0);
    }

    @Test
    public void testUtf8KeyLookupFromMap() throws Exception {
        Map<CharSequence, Object> tm = new TreeMap<CharSequence, Object> ();
        tm.put(new Utf8("foo"), "foo");
        tm.put(new Utf8("bar"), "bar");

        AvroMapWrapper wrapper = new AvroMapWrapper(tm);
        String v = (String)wrapper.get(new Utf8("foo"));
        assertEquals("foo", v);
        v = (String)wrapper.get(new Utf8("bar"));
        assertEquals("bar", v);
    }

    @Test
    public void testAvroMapWrapper() throws Exception {
        final Map<CharSequence, Object> m = new HashMap<CharSequence, Object>();
        for (String fn : avroSchemas) {
            final String avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
            int i = 0;
            for (GenericContainer r : readAvroData(avro)) {
                m.put(new Utf8(fn + i), r);
                i += 1;
            }
        }
        final AvroMapWrapper amw = new AvroMapWrapper(m);
        // Test out all the interfaces the AvroMapWrapper supports
        for (Object o : amw.values()) {
            assertTrue(isValidPigObject(o));
        }
        for (CharSequence k : amw.keySet()) {
            assertTrue(isValidPigObject(k));
            assertTrue(isValidPigObject(amw.get(k)));
        }
        for (Map.Entry<CharSequence, Object> e : amw.entrySet()) {
            assertTrue(isValidPigObject(e.getKey()));
            assertTrue(isValidPigObject(e.getValue()));
        }
    }

    private boolean isValidPigObject(Object o) {
        if (o == null) {
            return true;
        }
        switch (DataType.findType(o)) {
            case DataType.TUPLE:
                for (Object inner : ((Tuple) o).getAll()) {
                    if (!isValidPigObject(inner)) {
                        return false;
                    }
                }
                return true;
            case DataType.BAG:
                final Iterator<Tuple> bi = ((DataBag) o).iterator();
                while (bi.hasNext()) {
                    if (!isValidPigObject(bi.next())) {
                        return false;
                    }
                }
                return true;
            case DataType.MAP:
                for (Object inner : ((Map) o).values()) {
                    if (!isValidPigObject(inner)) {
                        return false;
                    }
                }
                return true;
            case DataType.BIGDECIMAL:
            case DataType.BIGINTEGER:
            case DataType.BOOLEAN:
            case DataType.BYTE:
            case DataType.BYTEARRAY:
            case DataType.CHARARRAY:
            case DataType.DATETIME:
            case DataType.DOUBLE:
            case DataType.FLOAT:
            case DataType.GENERIC_WRITABLECOMPARABLE:
            case DataType.INTEGER:
            case DataType.LONG:
                return true;
            case DataType.ERROR:
            default:
                return false;
        }
    }

    private List<GenericContainer> readAvroData(String path) throws IOException {
        final FileSystem fs = FileSystem.getLocal(new Configuration());
        final Path filePath = new Path(path);
        assertTrue("File path " + filePath + " does not exists!", fs.exists(filePath));
        final GenericDatumReader<GenericContainer> reader = new GenericDatumReader<GenericContainer>();
        final DataFileStream<GenericContainer> in = new DataFileStream<GenericContainer>(fs.open(filePath), reader);
        final List<GenericContainer> avroData = new ArrayList<GenericContainer>();
        try {
            while (in.hasNext()) {
                GenericContainer obj = in.next();
                avroData.add(obj);
            }
        } finally {
            Closeables.closeQuietly(in);
        }
        return avroData;
    }

    private void testAvroStorage(boolean expectedToSucceed, String scriptFile, Map<String,String> parameterMap) throws IOException {
        pigServerLocal.setBatchOn();

        int numOfFailedJobs = 0;

        try {
          pigServerLocal.registerScript(scriptFile, parameterMap);
          for (ExecJob job : pigServerLocal.executeBatch()) {
            if (job.getStatus().equals(JOB_STATUS.FAILED)) {
                  numOfFailedJobs++;
              }
          }
        } catch (Exception e) {
          System.err.printf("Exception caught in testAvroStorage: %s\n", e);
          numOfFailedJobs++;
        }

        if (expectedToSucceed) {
          assertTrue("There was a failed job!", numOfFailedJobs == 0);
        } else {
          assertTrue("There was no failed job!", numOfFailedJobs > 0);
        }
    }

    private void verifyResults(String outPath, String expectedOutpath) throws IOException {
        verifyResults(outPath, expectedOutpath, null);
    }

    private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
        FileSystem fs = FileSystem.getLocal(new Configuration()) ;

        /* read in expected results*/
        Set<GenericData.Record> expected = getExpected (expectedOutpath);

        /* read in output results and compare */
        Path output = new Path(outPath);
        assertTrue("Output dir does not exists!", fs.exists(output)
                && fs.getFileStatus(output).isDir());

        Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
        assertTrue("Split field dirs not found!", paths != null);

        for (Path path : paths) {
          Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
          assertTrue("No files found for path: " + path.toUri().getPath(),
                  files != null);
          for (Path filePath : files) {
            assertTrue("This shouldn't be a directory", fs.isFile(filePath));

            GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>();

            DataFileStream<GenericData.Record> in = new DataFileStream<GenericData.Record>(
                                            fs.open(filePath), reader);
            assertEquals("codec", expectedCodec, in.getMetaString("avro.codec"));
            int count = 0;
            while (in.hasNext()) {
                GenericData.Record obj = in.next();
                assertTrue("Avro result object found that's not expected: Found "
                        + (obj != null ? obj.getSchema() : "null") + ", " + obj.toString()
                        + "\nExpected " + (expected != null ? expected.toString() : "null") + "\n"
                        , expected.contains(obj));
                count++;
            }
            in.close();
            assertEquals(expected.size(), count);
          }
        }
    }

    private Set<GenericData.Record> getExpected (String pathstr ) throws IOException {

        Set<GenericData.Record> ret = new TreeSet<GenericData.Record>(
                new Comparator<GenericData.Record>() {
                    @Override
                    public int compare(Record o1, Record o2) {
                        //return o1.compareTo(o2); throws AvroRuntimeException: Can't compare maps!
                        return o1.equals(o2) ? 0 : o1.toString().compareTo(o2.toString());
                    }}
                );
        FileSystem fs = FileSystem.getLocal(new Configuration());

        /* read in output results and compare */
        Path output = new Path(pathstr);
        assertTrue("Expected output does not exists!", fs.exists(output));

        Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
        assertTrue("Split field dirs not found!", paths != null);

        for (Path path : paths) {
            Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
            assertTrue("No files found for path: " + path.toUri().getPath(), files != null);
            for (Path filePath : files) {
                assertTrue("This shouldn't be a directory", fs.isFile(filePath));

                GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>();

                DataFileStream<GenericData.Record> in = new DataFileStream<GenericData.Record>(fs.open(filePath), reader);

                while (in.hasNext()) {
                    GenericData.Record obj = in.next();
                    ret.add(obj);
                }
                in.close();
            }
        }
        return ret;
    }

}

