Changes'
diff --git a/core/src/test/java/io/onetable/GenericTable.java b/core/src/test/java/io/onetable/GenericTable.java
index 3eda7c4..0e7fa56 100644
--- a/core/src/test/java/io/onetable/GenericTable.java
+++ b/core/src/test/java/io/onetable/GenericTable.java
@@ -125,6 +125,39 @@
}
}
+ static GenericTable getInstanceWithCommonSchema(
+ String tableName,
+ Path tempDir,
+ SparkSession sparkSession,
+ JavaSparkContext jsc,
+ TableFormat sourceFormat) {
+ switch (sourceFormat) {
+ case HUDI:
+ return TestSparkHudiTable.forGivenSchemaAndPartitioning(
+ tableName,
+ tempDir,
+ jsc,
+ TestCommonTableHelper.getCommonSchemaInAvroFormat(),
+ TestCommonTableHelper.getHudiPartitionConfig());
+ case DELTA:
+ return TestSparkDeltaTable.forGivenSchemaAndPartitioning(
+ tableName,
+ tempDir,
+ sparkSession,
+ TestCommonTableHelper.getCommonSchemaInStructFormat(),
+ TestCommonTableHelper.getDeltaPartitionField());
+ case ICEBERG:
+ return TestIcebergTable.forGivenSchemaAndPartitioning(
+ tableName,
+ tempDir,
+ jsc.hadoopConfiguration(),
+ TestCommonTableHelper.getCommonSchemaInIcebergFormat(),
+ TestCommonTableHelper.getIcebergPartitionField());
+ default:
+ throw new IllegalArgumentException("Unsupported source format: " + sourceFormat);
+ }
+ }
+
static String getTableName() {
return "test_table_" + UUID.randomUUID().toString().replaceAll("-", "_");
}
diff --git a/core/src/test/java/io/onetable/ITOneTableClient.java b/core/src/test/java/io/onetable/ITOneTableClient.java
index 7609cf8..0ca9d97 100644
--- a/core/src/test/java/io/onetable/ITOneTableClient.java
+++ b/core/src/test/java/io/onetable/ITOneTableClient.java
@@ -670,6 +670,148 @@
}
}
+ @Test
+ public void roundTripTesting() {
+ SourceClientProvider<?> hudiSourceClientProvider = getSourceClientProvider(TableFormat.HUDI);
+ SourceClientProvider<?> icebergSourceClientProvider =
+ getSourceClientProvider(TableFormat.ICEBERG);
+ SourceClientProvider<?> deltaSourceClientProvider = getSourceClientProvider(TableFormat.DELTA);
+ String onetablePartitionConfig = "level:VALUE";
+ SyncMode syncMode = SyncMode.INCREMENTAL;
+
+ // Create table in hudi and sync to iceberg and delta.
+ String tableName = getTableName();
+ OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration());
+ List<?> insertRecords;
+ try (GenericTable table =
+ GenericTable.getInstanceWithCommonSchema(
+ tableName, tempDir, sparkSession, jsc, TableFormat.HUDI)) {
+ insertRecords = table.insertRows(100);
+
+ PerTableConfig perTableConfig =
+ PerTableConfig.builder()
+ .tableName(tableName)
+ .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA))
+ .tableBasePath(table.getBasePath())
+ .tableDataPath(table.getDataPath())
+ .hudiSourceConfig(
+ HudiSourceConfig.builder()
+ .partitionFieldSpecConfig(onetablePartitionConfig)
+ .build())
+ .syncMode(syncMode)
+ .build();
+ oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 100);
+
+ // make multiple commits and then sync
+ table.insertRows(100);
+ table.upsertRows(insertRecords.subList(0, 20));
+ oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.HUDI, table, Arrays.asList(TableFormat.DELTA, TableFormat.ICEBERG), 200);
+ }
+
+ // Now Get the table as Delta and do Sync to Hudi and Iceberg.
+ try (GenericTable table =
+ GenericTable.getInstanceWithCommonSchema(
+ tableName, tempDir, sparkSession, jsc, TableFormat.DELTA)) {
+ PerTableConfig perTableConfig =
+ PerTableConfig.builder()
+ .tableName(tableName)
+ .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG))
+ .tableBasePath(table.getBasePath())
+ .tableDataPath(table.getDataPath())
+ .hudiSourceConfig(
+ HudiSourceConfig.builder()
+ .partitionFieldSpecConfig(onetablePartitionConfig)
+ .build())
+ .syncMode(syncMode)
+ .build();
+
+ table.insertRows(100);
+ oneTableClient.sync(perTableConfig, deltaSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.DELTA, table, Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG), 300);
+
+ table.insertRows(100);
+ oneTableClient.sync(perTableConfig, deltaSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.DELTA, table, Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG), 400);
+
+ table.upsertRows(insertRecords.subList(0, 20));
+ oneTableClient.sync(perTableConfig, deltaSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.DELTA, table, Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG), 400);
+ }
+
+ // Get table as Iceberg and do Sync to Hudi and Delta.
+ try (GenericTable table =
+ GenericTable.getInstanceWithCommonSchema(
+ tableName, tempDir, sparkSession, jsc, TableFormat.ICEBERG)) {
+ PerTableConfig perTableConfig =
+ PerTableConfig.builder()
+ .tableName(tableName)
+ .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA))
+ .tableBasePath(table.getBasePath())
+ .tableDataPath(table.getDataPath())
+ .hudiSourceConfig(
+ HudiSourceConfig.builder()
+ .partitionFieldSpecConfig(onetablePartitionConfig)
+ .build())
+ .syncMode(syncMode)
+ .build();
+
+ table.insertRows(100);
+ oneTableClient.sync(perTableConfig, icebergSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.ICEBERG, table, Arrays.asList(TableFormat.HUDI, TableFormat.DELTA), 500);
+
+ table.insertRows(100);
+ oneTableClient.sync(perTableConfig, icebergSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.ICEBERG, table, Arrays.asList(TableFormat.HUDI, TableFormat.DELTA), 600);
+
+ table.upsertRows(insertRecords.subList(0, 20));
+ oneTableClient.sync(perTableConfig, icebergSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.ICEBERG, table, Arrays.asList(TableFormat.HUDI, TableFormat.DELTA), 600);
+ }
+
+ // Get one last time, table as Hudi and do Sync to Iceberg and Delta.
+ try (GenericTable table =
+ GenericTable.getInstanceWithCommonSchema(
+ tableName, tempDir, sparkSession, jsc, TableFormat.HUDI)) {
+ PerTableConfig perTableConfig =
+ PerTableConfig.builder()
+ .tableName(tableName)
+ .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA))
+ .tableBasePath(table.getBasePath())
+ .tableDataPath(table.getDataPath())
+ .hudiSourceConfig(
+ HudiSourceConfig.builder()
+ .partitionFieldSpecConfig(onetablePartitionConfig)
+ .build())
+ .syncMode(syncMode)
+ .build();
+
+ table.insertRows(100);
+ oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 700);
+
+ table.insertRows(100);
+ oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 800);
+
+ table.upsertRows(insertRecords.subList(0, 20));
+ oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
+ checkDatasetEquivalence(
+ TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 800);
+ }
+ }
+
private Map<String, String> getTimeTravelOption(TableFormat tableFormat, Instant time) {
Map<String, String> options = new HashMap<>();
switch (tableFormat) {
diff --git a/core/src/test/java/io/onetable/TestAbstractHudiTable.java b/core/src/test/java/io/onetable/TestAbstractHudiTable.java
index 8fe5c1b..7e2fce6 100644
--- a/core/src/test/java/io/onetable/TestAbstractHudiTable.java
+++ b/core/src/test/java/io/onetable/TestAbstractHudiTable.java
@@ -761,6 +761,6 @@
@Override
public String getOrderByColumn() {
- return "_hoodie_record_key";
+ return RECORD_KEY_FIELD_NAME;
}
}
diff --git a/core/src/test/java/io/onetable/TestCommonTableHelper.java b/core/src/test/java/io/onetable/TestCommonTableHelper.java
new file mode 100644
index 0000000..beb76a9
--- /dev/null
+++ b/core/src/test/java/io/onetable/TestCommonTableHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.onetable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.types.StructType;
+
+import io.onetable.avro.AvroSchemaConverter;
+import io.onetable.delta.DeltaSchemaExtractor;
+import io.onetable.iceberg.IcebergSchemaExtractor;
+import io.onetable.model.schema.OneSchema;
+
+public class TestCommonTableHelper {
+ private static Schema COMMON_SCHEMA;
+ private static OneSchema COMMON_ONE_SCHEMA;
+
+ static {
+ try (InputStream inputStream =
+ GenericTable.class.getClassLoader().getResourceAsStream("schemas/common_schema.avsc")) {
+ COMMON_SCHEMA = new Schema.Parser().parse(inputStream);
+ COMMON_ONE_SCHEMA = AvroSchemaConverter.getInstance().toOneSchema(COMMON_SCHEMA);
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ public static Schema getCommonSchemaInAvroFormat() {
+ return COMMON_SCHEMA;
+ }
+
+ public static StructType getCommonSchemaInStructFormat() {
+ return DeltaSchemaExtractor.getInstance().fromOneSchema(COMMON_ONE_SCHEMA);
+ }
+
+ public static org.apache.iceberg.Schema getCommonSchemaInIcebergFormat() {
+ return IcebergSchemaExtractor.getInstance().toIceberg(COMMON_ONE_SCHEMA);
+ }
+
+ public static String getHudiPartitionConfig() {
+ return "level:SIMPLE";
+ }
+
+ public static String getDeltaPartitionField() {
+ return "level";
+ }
+
+ public static String getIcebergPartitionField() {
+ return "level";
+ }
+}
diff --git a/core/src/test/java/io/onetable/TestIcebergTable.java b/core/src/test/java/io/onetable/TestIcebergTable.java
index ada0880..3d5bf0c 100644
--- a/core/src/test/java/io/onetable/TestIcebergTable.java
+++ b/core/src/test/java/io/onetable/TestIcebergTable.java
@@ -18,6 +18,7 @@
package io.onetable;
+import static io.onetable.iceberg.TestIcebergDataHelper.DEFAULT_RECORD_KEY_FIELD;
import static io.onetable.iceberg.TestIcebergDataHelper.createIcebergDataHelper;
import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP;
import static org.junit.jupiter.api.Assertions.*;
@@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -70,7 +72,6 @@
@Getter
public class TestIcebergTable implements GenericTable<Record, String> {
- private static final String DEFAULT_RECORD_KEY_FIELD = "id";
private static final List<String> DEFAULT_PARTITION_FIELDS = Collections.singletonList("level");
private final String tableName;
@@ -87,7 +88,7 @@
tableName,
tempDir,
hadoopConf,
- DEFAULT_RECORD_KEY_FIELD,
+ Optional.empty(),
Collections.singletonList(partitionField),
false);
}
@@ -98,23 +99,38 @@
tableName,
tempDir,
hadoopConf,
- DEFAULT_RECORD_KEY_FIELD,
+ Optional.empty(),
Collections.singletonList(partitionField),
true);
}
+ public static TestIcebergTable forGivenSchemaAndPartitioning(
+ String tableName,
+ Path tempDir,
+ Configuration hadoopConf,
+ Schema tableSchema,
+ String partitionField) {
+ return new TestIcebergTable(
+ tableName,
+ tempDir,
+ hadoopConf,
+ Optional.of(tableSchema),
+ Collections.singletonList(partitionField),
+ false);
+ }
+
public TestIcebergTable(
String tableName,
Path tempDir,
Configuration hadoopConf,
- String recordKeyField,
+ Optional<Schema> tableSchema,
List<String> partitionFields,
boolean includeAdditionalColumns) {
this.tableName = tableName;
this.basePath = tempDir.toUri().toString();
this.icebergDataHelper =
createIcebergDataHelper(
- recordKeyField, filterNullFields(partitionFields), includeAdditionalColumns);
+ tableSchema, filterNullFields(partitionFields), includeAdditionalColumns);
this.schema = icebergDataHelper.getTableSchema();
PartitionSpec partitionSpec = icebergDataHelper.getPartitionSpec();
@@ -294,7 +310,7 @@
@Override
public String getFilterQuery() {
- return String.format("%s > 'aaa'", icebergDataHelper.getRecordKeyField());
+ return String.format("%s > 'aaa'", DEFAULT_RECORD_KEY_FIELD);
}
public Long getLastCommitTimestamp() {
diff --git a/core/src/test/java/io/onetable/TestJavaHudiTable.java b/core/src/test/java/io/onetable/TestJavaHudiTable.java
index 0130787..8ae179b 100644
--- a/core/src/test/java/io/onetable/TestJavaHudiTable.java
+++ b/core/src/test/java/io/onetable/TestJavaHudiTable.java
@@ -36,6 +36,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieJavaWriteClient;
@@ -96,39 +97,23 @@
}
/**
- * Create a test table instance with a schema that has more fields than an instance returned by
- * {@link #forStandardSchema(String, Path, String, HoodieTableType)}. Specifically this instance
- * will add a top level field, nested field, field within a list, and field within a map to ensure
- * schema evolution is properly handled.
+ * Create a test table instance for general testing with given schema and partitioning(if
+ * enabled).
*
- * @param tableName name of the table used in the test, should be unique per test within a shared
- * directory
- * @param tempDir directory where table will be written, typically a temporary directory that will
- * be cleaned up after the tests.
- * @param partitionConfig sets the property `hoodie.datasource.write.partitionpath.field` for the
- * {@link CustomKeyGenerator}. If null, {@link NonpartitionedKeyGenerator} will be used.
- * @param tableType the table type to use (MoR or CoW)
- * @return an instance of the class with this configuration
+ * @param tableName
+ * @param tempDir
+ * @param jsc
+ * @param partitionConfig
+ * @return
*/
- public static TestJavaHudiTable withAdditionalColumns(
- String tableName, Path tempDir, String partitionConfig, HoodieTableType tableType) {
- return new TestJavaHudiTable(
- tableName,
- addSchemaEvolutionFieldsToBase(BASIC_SCHEMA),
- tempDir,
- partitionConfig,
- tableType,
- null);
- }
-
- public static TestJavaHudiTable withAdditionalTopLevelField(
+ public static TestJavaHudiTable forGivenSchemaAndPartitioning(
String tableName,
Path tempDir,
- String partitionConfig,
- HoodieTableType tableType,
- Schema previousSchema) {
- return new TestJavaHudiTable(
- tableName, addTopLevelField(previousSchema), tempDir, partitionConfig, tableType, null);
+ JavaSparkContext jsc,
+ Schema tableSchema,
+ String partitionConfig) {
+ return withSchema(
+ tableName, tempDir, partitionConfig, HoodieTableType.COPY_ON_WRITE, tableSchema);
}
public static TestJavaHudiTable withSchema(
diff --git a/core/src/test/java/io/onetable/TestSparkDeltaTable.java b/core/src/test/java/io/onetable/TestSparkDeltaTable.java
index 89bc9f2..793d664 100644
--- a/core/src/test/java/io/onetable/TestSparkDeltaTable.java
+++ b/core/src/test/java/io/onetable/TestSparkDeltaTable.java
@@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Getter;
@@ -39,6 +40,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.delta.DeltaLog;
@@ -63,18 +65,31 @@
public static TestSparkDeltaTable forStandardSchemaAndPartitioning(
String tableName, Path tempDir, SparkSession sparkSession, String partitionField) {
- return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, false);
+ return new TestSparkDeltaTable(
+ tableName, tempDir, sparkSession, Optional.empty(), partitionField, false);
}
public static TestSparkDeltaTable forSchemaWithAdditionalColumnsAndPartitioning(
String tableName, Path tempDir, SparkSession sparkSession, String partitionField) {
- return new TestSparkDeltaTable(tableName, tempDir, sparkSession, partitionField, true);
+ return new TestSparkDeltaTable(
+ tableName, tempDir, sparkSession, Optional.empty(), partitionField, true);
+ }
+
+ public static TestSparkDeltaTable forGivenSchemaAndPartitioning(
+ String tableName,
+ Path tempDir,
+ SparkSession sparkSession,
+ StructType tableSchema,
+ String partitionField) {
+ return new TestSparkDeltaTable(
+ tableName, tempDir, sparkSession, Optional.of(tableSchema), partitionField, false);
}
public TestSparkDeltaTable(
String name,
Path tempDir,
SparkSession sparkSession,
+ Optional<StructType> tableSchema,
String partitionField,
boolean includeAdditionalColumns) {
try {
@@ -83,7 +98,8 @@
this.sparkSession = sparkSession;
this.partitionField = partitionField;
this.includeAdditionalColumns = includeAdditionalColumns;
- this.testDeltaHelper = createTestDataHelper(partitionField, includeAdditionalColumns);
+ this.testDeltaHelper =
+ createTestDataHelper(tableSchema, partitionField, includeAdditionalColumns);
testDeltaHelper.createTable(sparkSession, tableName, basePath);
this.deltaLog = DeltaLog.forTable(sparkSession, basePath);
this.deltaTable = DeltaTable.forPath(sparkSession, basePath);
@@ -118,7 +134,7 @@
@Override
public String getOrderByColumn() {
- return "id";
+ return "key";
}
@SneakyThrows
@@ -199,7 +215,8 @@
}
private String initBasePath(Path tempDir, String tableName) throws IOException {
- Path basePath = tempDir.resolve(tableName);
+ // To decouple table name and base path.
+ Path basePath = tempDir.resolve(tableName + "_v1");
Files.createDirectories(basePath);
return basePath.toUri().toString();
}
diff --git a/core/src/test/java/io/onetable/TestSparkHudiTable.java b/core/src/test/java/io/onetable/TestSparkHudiTable.java
index 205d54c..4e3e45f 100644
--- a/core/src/test/java/io/onetable/TestSparkHudiTable.java
+++ b/core/src/test/java/io/onetable/TestSparkHudiTable.java
@@ -115,34 +115,23 @@
}
/**
- * Create a test table instance with a schema that has more fields than an instance returned by
- * {@link #forStandardSchema(String, Path, JavaSparkContext, String, HoodieTableType)}.
- * Specifically this instance will add a top level field, nested field, field within a list, and
- * field within a map to ensure schema evolution is properly handled.
+ * Create a test table instance for general testing with given schema and partitioning(if
+ * enabled).
*
- * @param tableName name of the table used in the test, should be unique per test within a shared
- * directory
- * @param tempDir directory where table will be written, typically a temporary directory that will
- * be cleaned up after the tests.
- * @param jsc the {@link JavaSparkContext} to use when writing data with Hudi
- * @param partitionConfig sets the property `hoodie.datasource.write.partitionpath.field` for the
- * {@link CustomKeyGenerator}. If null, {@link NonpartitionedKeyGenerator} will be used.
- * @param tableType the table type to use (MoR or CoW)
- * @return an instance of the class with this configuration
+ * @param tableName
+ * @param tempDir
+ * @param jsc
+ * @param partitionConfig
+ * @return
*/
- public static TestSparkHudiTable withAdditionalColumns(
+ public static TestSparkHudiTable forGivenSchemaAndPartitioning(
String tableName,
Path tempDir,
JavaSparkContext jsc,
- String partitionConfig,
- HoodieTableType tableType) {
+ Schema tableSchema,
+ String partitionConfig) {
return new TestSparkHudiTable(
- tableName,
- addSchemaEvolutionFieldsToBase(BASIC_SCHEMA),
- tempDir,
- jsc,
- partitionConfig,
- tableType);
+ tableName, tableSchema, tempDir, jsc, partitionConfig, HoodieTableType.COPY_ON_WRITE);
}
private TestSparkHudiTable(
diff --git a/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java b/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java
index c50ddd6..149468c 100644
--- a/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java
+++ b/core/src/test/java/io/onetable/delta/ITDeltaSourceClient.java
@@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -323,7 +324,12 @@
String tableName = getTableName();
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(
- tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false);
+ tableName,
+ tempDir,
+ sparkSession,
+ Optional.empty(),
+ isPartitioned ? "yearOfBirth" : null,
+ false);
List<List<String>> allActiveFiles = new ArrayList<>();
List<TableChange> allTableChanges = new ArrayList<>();
List<Row> rows = testSparkDeltaTable.insertRows(50);
@@ -378,7 +384,12 @@
String tableName = getTableName();
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(
- tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false);
+ tableName,
+ tempDir,
+ sparkSession,
+ Optional.empty(),
+ isPartitioned ? "yearOfBirth" : null,
+ false);
// Insert 50 rows to 2018 partition.
List<Row> commit1Rows = testSparkDeltaTable.insertRowsForPartition(50, 2018);
Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
@@ -428,7 +439,12 @@
String tableName = getTableName();
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(
- tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false);
+ tableName,
+ tempDir,
+ sparkSession,
+ Optional.empty(),
+ isPartitioned ? "yearOfBirth" : null,
+ false);
List<List<String>> allActiveFiles = new ArrayList<>();
List<TableChange> allTableChanges = new ArrayList<>();
List<Row> rows = testSparkDeltaTable.insertRows(50);
@@ -482,7 +498,12 @@
String tableName = getTableName();
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(
- tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, true);
+ tableName,
+ tempDir,
+ sparkSession,
+ Optional.empty(),
+ isPartitioned ? "yearOfBirth" : null,
+ true);
List<List<String>> allActiveFiles = new ArrayList<>();
List<TableChange> allTableChanges = new ArrayList<>();
List<Row> rows = testSparkDeltaTable.insertRows(50);
@@ -526,7 +547,8 @@
public void testDropPartition() {
String tableName = getTableName();
TestSparkDeltaTable testSparkDeltaTable =
- new TestSparkDeltaTable(tableName, tempDir, sparkSession, "yearOfBirth", false);
+ new TestSparkDeltaTable(
+ tableName, tempDir, sparkSession, Optional.empty(), "yearOfBirth", false);
List<List<String>> allActiveFiles = new ArrayList<>();
List<TableChange> allTableChanges = new ArrayList<>();
@@ -583,7 +605,12 @@
String tableName = getTableName();
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(
- tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : null, false);
+ tableName,
+ tempDir,
+ sparkSession,
+ Optional.empty(),
+ isPartitioned ? "yearOfBirth" : null,
+ false);
List<List<String>> allActiveFiles = new ArrayList<>();
List<TableChange> allTableChanges = new ArrayList<>();
List<Row> rows = testSparkDeltaTable.insertRows(50);
diff --git a/core/src/test/java/io/onetable/delta/TestDeltaHelper.java b/core/src/test/java/io/onetable/delta/TestDeltaHelper.java
index a1d1ad1..934552a 100644
--- a/core/src/test/java/io/onetable/delta/TestDeltaHelper.java
+++ b/core/src/test/java/io/onetable/delta/TestDeltaHelper.java
@@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -76,7 +77,7 @@
private static final StructField[] COMMON_FIELDS =
new StructField[] {
- new StructField("id", IntegerType, false, Metadata.empty()),
+ new StructField("key", StringType, false, Metadata.empty()),
new StructField("firstName", StringType, true, Metadata.empty()),
new StructField("lastName", StringType, true, Metadata.empty()),
new StructField("gender", StringType, true, Metadata.empty()),
@@ -108,7 +109,13 @@
private static final StructField[] ADDITIONAL_FIELDS =
new StructField[] {new StructField("street", StringType, true, Metadata.empty())};
private static final StructField[] DATE_PARTITIONED_FIELDS =
- new StructField[] {new StructField("yearOfBirth", IntegerType, true, Metadata.empty())};
+ new StructField[] {
+ new StructField(
+ "yearOfBirth",
+ IntegerType,
+ true,
+ Metadata.fromJson("{\"delta.generationExpression\": \"YEAR(birthDate)\"}"))
+ };
private static final Random RANDOM = new Random();
private static final String[] GENDERS = {"Male", "Female"};
@@ -118,8 +125,11 @@
boolean includeAdditionalColumns;
public static TestDeltaHelper createTestDataHelper(
- String partitionField, boolean includeAdditionalColumns) {
- StructType tableSchema = generateDynamicSchema(partitionField, includeAdditionalColumns);
+ Optional<StructType> tblInputSchema,
+ String partitionField,
+ boolean includeAdditionalColumns) {
+ StructType tableSchema =
+ tblInputSchema.orElse(generateDynamicSchema(partitionField, includeAdditionalColumns));
return TestDeltaHelper.builder()
.tableStructSchema(tableSchema)
.partitionField(partitionField)
@@ -142,26 +152,31 @@
public void createTable(SparkSession sparkSession, String tableName, String basePath) {
DeltaTableBuilder tableBuilder =
DeltaTable.createIfNotExists(sparkSession).tableName(tableName).location(basePath);
- Arrays.stream(COMMON_FIELDS).forEach(tableBuilder::addColumn);
- if ("yearOfBirth".equals(partitionField)) {
- tableBuilder
- .addColumn(
- DeltaTable.columnBuilder("yearOfBirth")
- .dataType(IntegerType)
- .generatedAlwaysAs("YEAR(birthDate)")
- .build())
- .partitionedBy("yearOfBirth");
- } else if ("level".equals(partitionField)) {
- tableBuilder.partitionedBy(partitionField);
+ for (StructField sf : tableStructSchema.fields()) {
+ tableBuilder = addFieldToTableBuilder(tableBuilder, sf);
+ }
+ if ("yearOfBirth".equals(partitionField) || "level".equals(partitionField)) {
+ tableBuilder = tableBuilder.partitionedBy(partitionField);
} else if (partitionField != null) {
throw new IllegalArgumentException("Unexpected partition field: " + partitionField);
}
- if (includeAdditionalColumns) {
- tableBuilder.addColumn("street", StringType);
- }
tableBuilder.execute();
}
+ private DeltaTableBuilder addFieldToTableBuilder(DeltaTableBuilder tableBuilder, StructField sf) {
+ if (sf.metadata().contains("delta.generationExpression")) {
+ String generatedExpression = sf.metadata().getString("delta.generationExpression");
+ if (generatedExpression != null) {
+ return tableBuilder.addColumn(
+ DeltaTable.columnBuilder(sf.name())
+ .dataType(sf.dataType())
+ .generatedAlwaysAs(generatedExpression)
+ .build());
+ }
+ }
+ return tableBuilder.addColumn(sf.name(), sf.dataType());
+ }
+
public Row generateRandomRow() {
int year = 2013 + RANDOM.nextInt(11);
String levelValue = LEVEL_VALUES.get(RANDOM.nextInt(LEVEL_VALUES.size()));
@@ -186,8 +201,8 @@
private Object generateValueForField(StructField field, int yearValue, String levelValue) {
switch (field.name()) {
- case "id":
- return ID_GENERATOR.incrementAndGet();
+ case "key":
+ return generateRandomString();
case "gender":
return GENDERS[RANDOM.nextInt(GENDERS.length)];
case "birthDate":
diff --git a/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java b/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java
index 8e9da45..5379e3c 100644
--- a/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java
+++ b/core/src/test/java/io/onetable/delta/TestDeltaPartitionExtractor.java
@@ -47,7 +47,7 @@
private static final Map<String, StructField> STRUCT_FIELD_MAP =
new HashMap<String, StructField>() {
{
- put("id", DataTypes.createStructField("id", DataTypes.IntegerType, false));
+ put("key", DataTypes.createStructField("key", DataTypes.IntegerType, false));
put("firstName", DataTypes.createStructField("firstName", DataTypes.StringType, false));
put("gender", DataTypes.createStructField("gender", DataTypes.StringType, false));
put(
@@ -115,7 +115,7 @@
@Test
public void testUnpartitionedTable() {
StructType tableSchema =
- getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate"));
+ getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate"));
OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
List<OnePartitionField> onePartitionFields =
deltaPartitionExtractor.convertFromDeltaPartitionFormat(oneSchema, new StructType());
@@ -125,7 +125,7 @@
@Test
public void testSimplePartitionedTable() {
StructType tableSchema =
- getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate"));
+ getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate"));
StructType partitionSchema = getSchemaWithFields(Arrays.asList("gender"));
OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
List<OnePartitionField> expectedOnePartitionFields =
@@ -146,7 +146,8 @@
@Test
public void testDatePartitionedGeneratedColumnsTable() {
StructType tableSchema =
- getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "dateOfBirth"));
+ getSchemaWithFields(
+ Arrays.asList("key", "firstName", "gender", "birthDate", "dateOfBirth"));
StructType partitionSchema = getSchemaWithFields(Arrays.asList("dateOfBirth"));
OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
List<OnePartitionField> expectedOnePartitionFields =
@@ -164,7 +165,7 @@
@Test
public void testDateFormatPartitionedGeneratedColumnsTable() {
StructType tableSchema =
- getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "dateFmt"));
+ getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate", "dateFmt"));
StructType partitionSchema = getSchemaWithFields(Arrays.asList("dateFmt"));
OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
List<OnePartitionField> expectedOnePartitionFields =
@@ -182,7 +183,8 @@
@Test
public void yearPartitionedGeneratedColumnsTable() {
StructType tableSchema =
- getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "yearOfBirth"));
+ getSchemaWithFields(
+ Arrays.asList("key", "firstName", "gender", "birthDate", "yearOfBirth"));
StructType partitionSchema = getSchemaWithFields(Arrays.asList("yearOfBirth"));
OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
List<OnePartitionField> expectedOnePartitionFields =
@@ -200,8 +202,9 @@
@Test
public void yearAndSimpleCombinedPartitionedGeneratedColumnsTable() {
StructType tableSchema =
- getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "yearOfBirth"));
- StructType partitionSchema = getSchemaWithFields(Arrays.asList("yearOfBirth", "id"));
+ getSchemaWithFields(
+ Arrays.asList("key", "firstName", "gender", "birthDate", "yearOfBirth"));
+ StructType partitionSchema = getSchemaWithFields(Arrays.asList("yearOfBirth", "key"));
OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
List<OnePartitionField> expectedOnePartitionFields =
Arrays.asList(
@@ -213,7 +216,7 @@
OnePartitionField.builder()
.sourceField(
OneField.builder()
- .name("id")
+ .name("key")
.schema(OneSchema.builder().name("integer").dataType(OneType.INT).build())
.build())
.transformType(PartitionTransformType.VALUE)
@@ -228,7 +231,7 @@
StructType tableSchema =
getSchemaWithFields(
Arrays.asList(
- "id",
+ "key",
"firstName",
"gender",
"birthDate",
@@ -257,16 +260,16 @@
@Test
public void testCombinationOfPlainAndGeneratedColumns() {
StructType tableSchema =
- getSchemaWithFields(Arrays.asList("id", "firstName", "gender", "birthDate", "dateFmt"));
+ getSchemaWithFields(Arrays.asList("key", "firstName", "gender", "birthDate", "dateFmt"));
StructType partitionSchema =
- getSchemaWithFields(Arrays.asList("id", "dateFmt", "gender", "dateOfBirth"));
+ getSchemaWithFields(Arrays.asList("key", "dateFmt", "gender", "dateOfBirth"));
OneSchema oneSchema = deltaSchemaExtractor.toOneSchema(tableSchema);
List<OnePartitionField> expectedOnePartitionFields =
Arrays.asList(
OnePartitionField.builder()
.sourceField(
OneField.builder()
- .name("id")
+ .name("key")
.schema(OneSchema.builder().name("integer").dataType(OneType.INT).build())
.build())
.transformType(PartitionTransformType.VALUE)
diff --git a/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java b/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java
index c2fd739..84c6afe 100644
--- a/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java
+++ b/core/src/test/java/io/onetable/hudi/ITHudiTargetClient.java
@@ -99,7 +99,7 @@
private static final HoodieEngineContext CONTEXT = new HoodieJavaEngineContext(CONFIGURATION);
private static final String TABLE_NAME = "test_table";
- private static final String KEY_FIELD_NAME = "id";
+ private static final String KEY_FIELD_NAME = "key";
private static final String PARTITION_FIELD_NAME = "partition_field";
private static final String OTHER_FIELD_NAME = "content";
private static final long FILE_SIZE = 100L;
diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java b/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java
index f2ad0c6..174a250 100644
--- a/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java
+++ b/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java
@@ -33,6 +33,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -55,10 +56,12 @@
@Builder
@Value
public class TestIcebergDataHelper {
+ public static final String DEFAULT_RECORD_KEY_FIELD = "key";
+
private static final Random RANDOM = new Random();
private static final List<Types.NestedField> COMMON_FIELDS =
Arrays.asList(
- NestedField.optional(1, "id", Types.StringType.get()),
+ NestedField.optional(1, "key", Types.StringType.get()),
NestedField.optional(2, "ts", Types.LongType.get()),
NestedField.optional(3, "level", Types.StringType.get()),
NestedField.optional(4, "severity", Types.IntegerType.get()),
@@ -114,15 +117,14 @@
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
Schema tableSchema;
- String recordKeyField;
List<String> partitionFieldNames;
public static TestIcebergDataHelper createIcebergDataHelper(
- String recordKeyField, List<String> partitionFields, boolean includeAdditionalColumns) {
- Schema tableSchema = getSchema(includeAdditionalColumns);
+ Optional<Schema> tblSchema, List<String> partitionFields, boolean includeAdditionalColumns) {
+
+ Schema schema = tblSchema.orElse(getSchema(includeAdditionalColumns));
return TestIcebergDataHelper.builder()
- .tableSchema(tableSchema)
- .recordKeyField(recordKeyField)
+ .tableSchema(schema)
.partitionFieldNames(partitionFields)
.build();
}
@@ -193,7 +195,7 @@
String fieldName = field.name();
Object value;
- if (fieldName.equals(recordKeyField)
+ if (fieldName.equals(DEFAULT_RECORD_KEY_FIELD)
|| (partitionFieldNames != null && partitionFieldNames.contains(fieldName))) {
value = existingRecord.getField(fieldName);
} else {
@@ -237,7 +239,7 @@
Type fieldType = field.type();
if (partitionValue != null && partitionFieldNames.contains(fieldName)) {
return partitionValue;
- } else if (fieldName.equals(recordKeyField)) {
+ } else if (fieldName.equals(DEFAULT_RECORD_KEY_FIELD)) {
return keyValue;
} else if (fieldName.equals("ts")) {
return System.currentTimeMillis();
diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java b/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java
index 7a3c884..2499f07 100644
--- a/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java
+++ b/core/src/test/java/io/onetable/iceberg/TestIcebergPartitionValueConverter.java
@@ -45,12 +45,12 @@
IcebergPartitionValueConverter.getInstance();
private static final Schema SCHEMA =
new Schema(
- Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(1, "key", Types.IntegerType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()),
Types.NestedField.optional(3, "birthDate", Types.TimestampType.withZone()));
private static final Schema SCHEMA_WITH_PARTITION =
new Schema(
- Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(1, "key", Types.IntegerType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()),
Types.NestedField.optional(3, "birthDate", Types.TimestampType.withZone()),
Types.NestedField.optional(4, "birthDate_year", Types.IntegerType.get()));
diff --git a/core/src/test/resources/schemas/common_schema.avsc b/core/src/test/resources/schemas/common_schema.avsc
new file mode 100644
index 0000000..8659f3f
--- /dev/null
+++ b/core/src/test/resources/schemas/common_schema.avsc
@@ -0,0 +1,62 @@
+{
+ "type": "record",
+ "name": "CommonSchema",
+ "namespace": "test",
+ "fields": [
+ {
+ "name": "key",
+ "type": "string"
+ },
+ {
+ "name": "ts",
+ "type": "long"
+ },
+ {
+ "name": "level",
+ "type": "string"
+ },
+ {
+ "name": "severity",
+ "type": ["null", "int"],
+ "default": null
+ },
+ {
+ "name": "double_field",
+ "type": "double",
+ "default": 0.0
+ },
+ {
+ "name": "float_field",
+ "type": "float",
+ "default": 0.0
+ },
+ {
+ "name": "int_field",
+ "type": "int",
+ "default": 0
+ },
+ {
+ "name": "long_field",
+ "type": "long",
+ "default": 0
+ },
+ {
+ "name": "boolean_field",
+ "type": "boolean",
+ "default": false
+ },
+ {
+ "name": "string_field",
+ "type": {
+ "type": "string",
+ "avro.java.string": "String"
+ },
+ "default": ""
+ },
+ {
+ "name": "bytes_field",
+ "type": "bytes",
+ "default": ""
+ }
+ ]
+}
\ No newline at end of file