blob: 51386d56c446b27d58c997ac694900ee41a54fe0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from datetime import date
import pytest
from pyspark.sql import SparkSession
from pyiceberg.catalog import Catalog
@pytest.mark.integration
def test_migrate_table(
session_catalog_hive: Catalog,
spark: SparkSession,
) -> None:
"""
Imported tables are an edge case since the partition column is not stored
in the Parquet files:
test_migrate_table_hive_1754486926/dt=2022-01-01/part-00000-30a9798b-7597-4027-86d9-79d7c529bc87.c000.snappy.parquet
{
"type" : "record",
"name" : "spark_schema",
"fields" : [ {
"name" : "number",
"type" : "int"
} ]
}
PyIceberg will project this column when the table is being read
"""
# Create new tables to avoid complex cleanup
src_table_identifier = f"spark_catalog.default.test_migrate_table_hive_{int(time.time())}"
dst_table_identifier = f"default.test_migrate_table_{int(time.time())}"
spark.sql(f"""
CREATE TABLE {src_table_identifier} (
number INTEGER
)
PARTITIONED BY (dt date)
STORED AS parquet
""")
spark.sql(f"""
INSERT OVERWRITE TABLE {src_table_identifier}
PARTITION (dt='2022-01-01')
VALUES (1), (2), (3)
""")
spark.sql(f"""
INSERT OVERWRITE TABLE {src_table_identifier}
PARTITION (dt='2023-01-01')
VALUES (4), (5), (6)
""")
# Docs: https://iceberg.apache.org/docs/latest/hive-migration/#snapshot-hive-table-to-iceberg
spark.sql(f"""
CALL hive.system.snapshot('{src_table_identifier}', 'hive.{dst_table_identifier}')
""")
tbl = session_catalog_hive.load_table(dst_table_identifier)
assert tbl.schema().column_names == ["number", "dt"]
assert set(tbl.scan().to_arrow().column(1).combine_chunks().tolist()) == {date(2023, 1, 1), date(2022, 1, 1)}
assert tbl.scan(row_filter="number > 3").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6]
assert tbl.scan(row_filter="dt == '2023-01-01'").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6]
assert tbl.scan(row_filter="dt == '2022-01-01'").to_arrow().column(0).combine_chunks().tolist() == [1, 2, 3]
assert tbl.scan(row_filter="dt < '2022-02-01'").to_arrow().column(0).combine_chunks().tolist() == [1, 2, 3]