more changes
diff --git a/core/src/test/java/io/onetable/TestIcebergTable.java b/core/src/test/java/io/onetable/TestIcebergTable.java
index a0d6380..c7addb9 100644
--- a/core/src/test/java/io/onetable/TestIcebergTable.java
+++ b/core/src/test/java/io/onetable/TestIcebergTable.java
@@ -65,6 +65,8 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
+import com.google.common.base.Preconditions;
+
import io.onetable.iceberg.TestIcebergDataHelper;
@Getter
@@ -140,11 +142,10 @@
return records;
}
- @Override
@SneakyThrows
- public List<Record> insertRecordsForSpecialPartition(int numRows) {
+ public List<Record> insertRecordsForPartition(int numRows, String partitionValue) {
List<Record> records =
- icebergDataHelper.generateInsertRecordForPartition(numRows, SPECIAL_PARTITION_VALUE);
+ icebergDataHelper.generateInsertRecordForPartition(numRows, partitionValue);
GenericAppenderFactory appenderFactory = new GenericAppenderFactory(icebergTable.schema());
PartitionSpec spec = icebergTable.spec();
@@ -164,6 +165,12 @@
return records;
}
+ @Override
+ @SneakyThrows
+ public List<Record> insertRecordsForSpecialPartition(int numRows) {
+ return insertRecordsForPartition(numRows, SPECIAL_PARTITION_VALUE);
+ }
+
@SneakyThrows
@Override
public void upsertRows(List<Record> recordsToUpdate) {
@@ -262,6 +269,17 @@
return DEFAULT_RECORD_KEY_FIELD;
}
+ public Map<String, List<Record>> groupRecordsByPartition(List<Record> records) {
+ Preconditions.checkArgument(
+ icebergDataHelper.getPartitionFieldNames().size() == 1,
+ "Only single partition field is supported for grouping records by partition");
+ Preconditions.checkArgument(
+ icebergDataHelper.getPartitionFieldNames().get(0).equals("level"),
+ "Only level partition field is supported for grouping records by partition");
+ return records.stream()
+ .collect(Collectors.groupingBy(record -> record.getField("level").toString()));
+ }
+
@Override
@SneakyThrows
public void close() {
diff --git a/core/src/test/java/io/onetable/iceberg/ITIcebergSourceClient.java b/core/src/test/java/io/onetable/iceberg/ITIcebergSourceClient.java
index 08303ce..b02f337 100644
--- a/core/src/test/java/io/onetable/iceberg/ITIcebergSourceClient.java
+++ b/core/src/test/java/io/onetable/iceberg/ITIcebergSourceClient.java
@@ -28,9 +28,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -61,13 +63,12 @@
@ParameterizedTest
@ValueSource(booleans = {false, true})
- public void testIcebergSourceClient(boolean isPartitioned) {
+ public void testInsertsUpsertsAndDeletes(boolean isPartitioned) {
String tableName = getTableName();
try (TestIcebergTable testIcebergTable =
TestIcebergTable.forStandardSchemaAndPartitioning(
tableName, isPartitioned ? "level" : null, tempDir, hadoopConf)) {
List<List<String>> allActiveFiles = new ArrayList<>();
- List<List<String>> allBaseFilePaths = new ArrayList<>();
List<TableChange> allTableChanges = new ArrayList<>();
List<Record> records = testIcebergTable.insertRows(50);
@@ -118,6 +119,65 @@
}
}
+ @Test
+ public void testDropPartition() {
+ String tableName = getTableName();
+ try (TestIcebergTable testIcebergTable =
+ TestIcebergTable.forStandardSchemaAndPartitioning(
+ tableName, "level", tempDir, hadoopConf)) {
+ List<List<String>> allActiveFiles = new ArrayList<>();
+ List<TableChange> allTableChanges = new ArrayList<>();
+
+ List<Record> records = testIcebergTable.insertRows(50);
+ Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+ allActiveFiles.add(testIcebergTable.getAllActiveFiles());
+
+ List<Record> records1 = testIcebergTable.insertRows(50);
+ allActiveFiles.add(testIcebergTable.getAllActiveFiles());
+
+ List<Record> allRecords = new ArrayList<>();
+ allRecords.addAll(records);
+ allRecords.addAll(records1);
+
+ Map<String, List<Record>> recordsByPartition =
+ testIcebergTable.groupRecordsByPartition(allRecords);
+ String partitionValueToDelete = recordsByPartition.keySet().iterator().next();
+ testIcebergTable.deletePartition(partitionValueToDelete);
+ allActiveFiles.add(testIcebergTable.getAllActiveFiles());
+
+ // Insert few records again for the deleted partition.
+ testIcebergTable.insertRecordsForPartition(20, partitionValueToDelete);
+ allActiveFiles.add(testIcebergTable.getAllActiveFiles());
+
+ PerTableConfig tableConfig =
+ PerTableConfig.builder()
+ .tableName(testIcebergTable.getTableName())
+ .tableBasePath(testIcebergTable.getBasePath())
+ .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA))
+ .build();
+ IcebergSourceClient icebergSourceClient = clientProvider.getSourceClientInstance(tableConfig);
+ assertEquals(
+ 120 - recordsByPartition.get(partitionValueToDelete).size(),
+ testIcebergTable.getNumRows());
+ OneSnapshot oneSnapshot = icebergSourceClient.getCurrentSnapshot();
+
+ validateIcebergPartitioning(oneSnapshot);
+ validateOneSnapshot(oneSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));
+ // Get changes in incremental format.
+ InstantsForIncrementalSync instantsForIncrementalSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.ofEpochMilli(timestamp1))
+ .build();
+ CommitsBacklog<Long> commitsBacklog =
+ icebergSourceClient.getCommitsBacklog(instantsForIncrementalSync);
+ for (Long version : commitsBacklog.getCommitsToProcess()) {
+ TableChange tableChange = icebergSourceClient.getTableChangeForCommit(version);
+ allTableChanges.add(tableChange);
+ }
+ validateTableChanges(allActiveFiles, allTableChanges);
+ }
+ }
+
private void validateIcebergPartitioning(OneSnapshot oneSnapshot) {
List<OnePartitionField> partitionFields = oneSnapshot.getTable().getPartitioningFields();
assertEquals(1, partitionFields.size());