[553] parquet source with partition extraction (#728)

* First attempt at parquet source with partition extraction

* refactored conversion source for reading parquetFiles once to getCurrentSnapshot()

* file changes + refactoring of conversion source

* cleanup

* implement get commit identifier

* Hudi Source Config uses PartitionFieldSpec.java

* Hudi Source Config uses PartitionFieldSpec.java bug fix

* refactoring for PartitionValueExtractor to use Hudi based related code

* aligning ConversionSource tests to include Parquet

* implemented remaining parquetSource methods + refactoring, TODO tests

* default case as an exception for finding parquet files + javadocs for PartitionFieldSpec + no file diffs removed files

* refactoring for specExtractor

* table changes update does not check if table exists already

* added test for conversion source

* adjusted test for Hudi after update of HudiSourceConfig

* fixed test bug for parquet and added partitionValues for statsExtractor

* fixed test bug for stats extractor by defining instance obj instead of using static function

* now using a string for the partition format input

* conversion source const for the test using specExtractor

* adjusted conversionSourceProvider for partition init

* adjusted table creation with Spark for Parquet

* fix tests for CI to pass

* fix tests for CI to pass (reverted)

* cleanups

* parquet files collected in a stream now for memory issue

* fixing CI error on partition config in tests

* seperate test class for parquet source

* adjusting pom config for Actions

* tests hitting the source, needs to create the parquet files locally in the test through Spark

* creating parquet files before Testing

* test bug fix

* cleanups

* TODO implement class for test

* TODO check why sync fails to all 3 formats

* sourceField bug not null OK, sync failing on stat generation from InternalType

* highlighting sync errors related to stats and partition fields

* partition config read from the test class + cleanups

* augmenting InternalSchema with the partitionField to get SourceField non-null

* HudiSourceConfig const param type fix

* duplicate col for partitions

* delta stats serialization from parquet data bug fixed

* removed old code and mvn spotless apply

* fixed partitioning by year in tests

* timestamp col is of type timestamp not string (todo schemaExtractor logicalTypes for INT96)

* Delta sync OK, stat conversion issue for Iceberg, metadata issue for Hudi

* Stat conversion patch for string (ICEBERG)

* Parsing Date Values from Partition Path, ICEBERG & DELTA Sync OK

* fix for Binary Stats conversion in Hudi + upgraded Parquet Version

* set recordKey Fields for Hudi Metadata + spotless:apply

* write data in a data path for proper reading of sourceTable after sync (Tests)

* set parquet schema for reading the dataset (Tests)

* bug fix of dataPath (Tests)

* different metadata path than the parquet file's to prevent reading confusion related to paths (Tests)

* read parquet sparksession for tests

* read parquet sparksession for tests

* update

* reading part col from path disabled

* spotless apply and CI pass error fixed

* CI pass error fixed

* CI pass error fixed

* CI pass error fixed

* CI pass error fixed

* CI pass error fixed

* refactored parquetSource + cleanups

* fix schemaExtractor bug

* fix Parquet related partition values extraction from path

* fix for Iceberg sync tests for CI to pass

* binary stats are converted into string stats in parquet stats extractor

* binary stats converted per logical type case: STRING....

* testing dataset equivalence differently for Hudi (partition column removed)

* set source identifier for current snapshot as the last modif time

* set spark config for CI

* reformatting + cleanups

* iceberg CI error fix

* delta CI error fix

* delta CI error fix

* refactoring for the statsExtractor

* refactoring for the statsExtractor

* refactoring for the statsExtractor

* many partitions as input for parquet

* many partitions as input for parquet

* revert change in hudi spec extractor

* fix for isNullable records

* fix for isNullable records

* cleanups

* cleanups

* revert hudi version to 0.14.0

* spotless:apply

* added test for non partitioned data + adjusted for cases of Month, Day partitions

* resolving the merge conflict1

* resolving the merge conflict1

* resolving the merge conflict2

* fix for Iceberg for (same key) partition values merge conflict

* fix for reading the datasets in the test class + disable equivalence test for Hudi do to difference in parttition values

* spotless:apply

* added boolean type to the testing dataset

* reformatting

* cleanups

* refactoring + reformatting

* more data types in the testing dataset + refactoring

* refactor test to seperate partitioned and non-partitioned cases

* refactor test to seperate partitioned and non-partitioned cases

* refactor test to seperate partitioned and non-partitioned cases

* refactor test to seperate partitioned and non-partitioned cases

* refactor test to seperate partitioned and non-partitioned cases

* refactor test to seperate partitioned and non-partitioned cases

* refactor test to seperate partitioned and non-partitioned cases

* reformatting

* re-read partitioned data -> create new data with additional rows -> partition again ->sync again

* fix writing second dataset bug

* fix writing second dataset bug

* fix datasets equivalence bug

* reformatting

* reformatting

* add more data than append to partitioned file then sync again test added

* add more data than append to partitioned file then sync again test added

* reformatting

* cleanups

* add test for Hudi format for non-partitioned data

* bug fix iceberg partition values

* bug fix iceberg partition values: revert Iceberg code

* parse partition values fix

* fix values parsing by lowering case the partitioning config

* fix values parsing by lowering case the partitioning config 2

* isIncrementalSafe returns false + cleanups + formatting

* address remaining feedback, remove unnecessary changes, cleanup

---------

Co-authored-by: Selim Soufargi <ssoufargi.idealab.unical@gmail.com~>
Co-authored-by: Timothy Brown <tim@onehouse.ai>
29 files changed
tree: 223eb28122f777e48d3643e0e6459abc4b8a0502
  1. .github/
  2. .mvn/
  3. assets/
  4. demo/
  5. release/
  6. rfc/
  7. spec/
  8. style/
  9. website/
  10. xtable-api/
  11. xtable-aws/
  12. xtable-core/
  13. xtable-hive-metastore/
  14. xtable-hudi-support/
  15. xtable-service/
  16. xtable-utilities/
  17. .asf.yaml
  18. .gitignore
  19. DISCLAIMER-WIP
  20. Dockerfile
  21. LICENSE
  22. lombok.config
  23. NOTICE
  24. pom.xml
  25. README.md
README.md

Apache XTable™ (Incubating)

Maven CI Build

Apache XTable™ (Incubating) is a cross-table converter for table formats that facilitates omni-directional interoperability across data processing systems and query engines. Currently, Apache XTable™ supports widely adopted open-source table formats such as Apache Hudi, Apache Iceberg, and Delta Lake.

Apache XTable™ simplifies data lake operations by leveraging a common model for table representation. This allows users to write data in one format while still benefiting from integrations and features available in other formats. For instance, Apache XTable™ enables existing Hudi users to seamlessly work with Databricks's Photon Engine or query Iceberg tables with Snowflake. Creating transformations from one format to another is straightforward and only requires the implementation of a few interfaces, which we believe will facilitate the expansion of supported source and target formats in the future.

Building the project and running tests.

  1. Use Java 11 for building the project. If you are using another Java version, you can use jenv to use multiple Java versions locally.
  2. Build the project using mvn clean package. Use mvn clean package -DskipTests to skip tests while building.
  3. Use mvn clean test or mvn test to run all unit tests. If you need to run only a specific test you can do this by something like mvn test -Dtest=TestDeltaSync -pl xtable-core.
  4. Similarly, use mvn clean verify or mvn verify to run integration tests.

Note: When using Maven version 3.9 or above, Maven automatically caches the build. To ignore build caching, you can add the -Dmaven.build.cache.enabled=false parameter. For example, mvn clean package -DskipTests -Dmaven.build.cache.enabled=false

Style guide

  1. We use Maven Spotless plugin and Google java format for code style.
  2. Use mvn spotless:check to find out code style violations and mvn spotless:apply to fix them. Code style check is tied to compile phase by default, so code style violations will lead to build failures.

Running the bundled jar

  1. Get a pre-built bundled jar or create the jar with mvn install -DskipTests
  2. Create a yaml file that follows the format below:
sourceFormat: HUDI
targetFormats:
  - DELTA
  - ICEBERG
datasets:
  -
    tableBasePath: s3://tpc-ds-datasets/1GB/hudi/call_center
    tableDataPath: s3://tpc-ds-datasets/1GB/hudi/call_center/data
    tableName: call_center
    namespace: my.db
  -
    tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales
    tableName: catalog_sales
    partitionSpec: cs_sold_date_sk:VALUE
  -
    tableBasePath: s3://hudi/multi-partition-dataset
    tableName: multi_partition_dataset
    partitionSpec: time_millis:DAY:yyyy-MM-dd,type:VALUE
  -
    tableBasePath: abfs://container@storage.dfs.core.windows.net/multi-partition-dataset
    tableName: multi_partition_dataset
  • sourceFormat is the format of the source table that you want to convert
  • targetFormats is a list of formats you want to create from your source tables
  • tableBasePath is the basePath of the table
  • tableDataPath is an optional field specifying the path to the data files. If not specified, the tableBasePath will be used. For Iceberg source tables, you will need to specify the /data path.
  • namespace is an optional field specifying the namespace of the table and will be used when syncing to a catalog.
  • partitionSpec is a spec that allows us to infer partition values. This is only required for Hudi source tables. If the table is not partitioned, leave it blank. If it is partitioned, you can specify a spec with a comma separated list with format path:type:format
    • path is a dot separated path to the partition field
    • type describes how the partition value was generated from the column value
      • VALUE: an identity transform of field value to partition value
      • YEAR: data is partitioned by a field representing a date and year granularity is used
      • MONTH: same as YEAR but with month granularity
      • DAY: same as YEAR but with day granularity
      • HOUR: same as YEAR but with hour granularity
    • format: if your partition type is YEAR, MONTH, DAY, or HOUR specify the format for the date string as it appears in your file paths
  1. The default implementations of table format converters can be replaced with custom implementations by specifying a converter configs yaml file in the format below:
# conversionSourceProviderClass: The class name of a table format's converter factory, where the converter is
#     used for reading from a table of this format. All user configurations, including hadoop config
#     and converter specific configuration, will be available to the factory for instantiation of the
#     converter.
# conversionTargetProviderClass: The class name of a table format's converter factory, where the converter is
#     used for writing to a table of this format.
# configuration: A map of configuration values specific to this converter.
tableFormatConverters:
    HUDI:
      conversionSourceProviderClass: org.apache.xtable.hudi.HudiConversionSourceProvider
    DELTA:
      conversionTargetProviderClass: org.apache.xtable.delta.DeltaConversionTarget
      configuration:
        spark.master: local[2]
        spark.app.name: xtable
  1. A catalog can be used when reading and updating Iceberg tables. The catalog can be specified in a yaml file and passed in with the --icebergCatalogConfig option. The format of the catalog config file is:
catalogImpl: io.my.CatalogImpl
catalogName: name
catalogOptions: # all other options are passed through in a map
  key1: value1
  key2: value2
  1. Run with java -jar xtable-utilities/target/xtable-utilities_2.12-0.2.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml [--hadoopConfig hdfs-site.xml] [--convertersConfig converters.yaml] [--icebergCatalogConfig catalog.yaml] The bundled jar includes hadoop dependencies for AWS, Azure, and GCP. Sample hadoop configurations for configuring the converters can be found in the xtable-hadoop-defaults.xml file. The custom hadoop configurations can be passed in with the --hadoopConfig [custom-hadoop-config-file] option. The config in custom hadoop config file will override the default hadoop configurations. For an example of a custom hadoop config file, see hadoop.xml.

Running using docker

  1. Build the docker image using docker build . -t xtable
  2. Mount the config files on the container and run the container:
docker run \
  -v ./xtable/config.yml:/xtable/config.yml \
  -v ./xtable/core-site.xml:/xtable/core-site.xml \
  -v ./xtable/catalog.yml:/xtable/catalog.yml \
  xtable \
  --datasetConfig /xtable/config.yml --hadoopConfig /xtable/core-site.xml --icebergCatalogConfig xtable/catalog.yml

Contributing

Setup

For setting up the repo on IntelliJ, open the project and change the Java version to Java 11 in File->ProjectStructure img.png

Found a bug or have a cool idea to contribute? Open a GitHub issue to get started. For more contribution guidelines and ways to stay involved, visit our community page.

Adding a new target format

Adding a new target format requires a developer implement ConversionTarget. Once you have implemented that interface, you can integrate it into the ConversionController. If you think others may find that target useful, please raise a Pull Request to add it to the project.

Overview of the sync process

img.png