blob: 31986dbe5d3bc3426144783de3815cf20b2578ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.spark.SparkException;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;
public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
public TestDelete(String catalogName, String implementation, Map<String, String> config,
String fileFormat, Boolean vectorized) {
super(catalogName, implementation, config, fileFormat, vectorized);
}
@BeforeClass
public static void setupSparkConf() {
spark.conf().set("spark.sql.shuffle.partitions", "4");
}
@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
sql("DROP TABLE IF EXISTS deleted_id");
sql("DROP TABLE IF EXISTS deleted_dep");
}
@Test
public void testDeleteFromEmptyTable() {
createAndInitUnpartitionedTable();
sql("DELETE FROM %s WHERE id IN (1)", tableName);
sql("DELETE FROM %s WHERE dep = 'hr'", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots()));
assertEquals("Should have expected rows",
ImmutableList.of(),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
@Test
public void testExplain() {
createAndInitUnpartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", tableName);
sql("EXPLAIN DELETE FROM %s WHERE id <=> 1", tableName);
sql("EXPLAIN DELETE FROM %s WHERE true", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 1 snapshot", 1, Iterables.size(table.snapshots()));
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
}
@Test
public void testDeleteWithAlias() {
createAndInitUnpartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", tableName);
sql("DELETE FROM %s AS t WHERE t.id IS NULL", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware")),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
@Test
public void testDeleteWithDynamicFileFiltering() throws NoSuchTableException {
createAndInitPartitionedTable();
append(new Employee(1, "hr"), new Employee(3, "hr"));
append(new Employee(1, "hardware"), new Employee(2, "hardware"));
sql("DELETE FROM %s WHERE id = 2", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots()));
Snapshot currentSnapshot = table.currentSnapshot();
validateSnapshot(currentSnapshot, "overwrite", "1", "1", "1");
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hardware"), row(1, "hr"), row(3, "hr")),
sql("SELECT * FROM %s ORDER BY id, dep", tableName));
}
@Test
public void testDeleteNonExistingRecords() {
createAndInitPartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", tableName);
sql("DELETE FROM %s AS t WHERE t.id > 10", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots()));
Snapshot currentSnapshot = table.currentSnapshot();
validateSnapshot(currentSnapshot, "overwrite", "0", null, null);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
}
@Test
public void testDeleteWithoutCondition() {
createAndInitPartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'hardware')", tableName);
sql("INSERT INTO TABLE %s VALUES (null, 'hr')", tableName);
sql("DELETE FROM %s", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 4 snapshots", 4, Iterables.size(table.snapshots()));
// should be a delete instead of an overwrite as it is done through a metadata operation
Snapshot currentSnapshot = table.currentSnapshot();
validateSnapshot(currentSnapshot, "delete", "2", "3", null);
assertEquals("Should have expected rows",
ImmutableList.of(),
sql("SELECT * FROM %s", tableName));
}
@Test
public void testDeleteUsingMetadataWithComplexCondition() {
createAndInitPartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'dep1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'dep2')", tableName);
sql("INSERT INTO TABLE %s VALUES (null, 'dep3')", tableName);
sql("DELETE FROM %s WHERE dep > 'dep2' OR dep = CAST(4 AS STRING) OR dep = 'dep2'", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 4 snapshots", 4, Iterables.size(table.snapshots()));
// should be a delete instead of an overwrite as it is done through a metadata operation
Snapshot currentSnapshot = table.currentSnapshot();
validateSnapshot(currentSnapshot, "delete", "2", "2", null);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "dep1")),
sql("SELECT * FROM %s", tableName));
}
@Test
public void testDeleteWithArbitraryPartitionPredicates() {
createAndInitPartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'hardware')", tableName);
sql("INSERT INTO TABLE %s VALUES (null, 'hr')", tableName);
// %% is an escaped version of %
sql("DELETE FROM %s WHERE id = 10 OR dep LIKE '%%ware'", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 4 snapshots", 4, Iterables.size(table.snapshots()));
// should be an overwrite since cannot be executed using a metadata operation
Snapshot currentSnapshot = table.currentSnapshot();
validateSnapshot(currentSnapshot, "overwrite", "1", "1", null);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
}
@Test
public void testDeleteWithNonDeterministicCondition() {
createAndInitPartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware')", tableName);
AssertHelpers.assertThrows("Should complain about non-deterministic expressions",
AnalysisException.class, "nondeterministic expressions are only allowed",
() -> sql("DELETE FROM %s WHERE id = 1 AND rand() > 0.5", tableName));
}
@Test
public void testDeleteWithFoldableConditions() {
createAndInitPartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware')", tableName);
// should keep all rows and don't trigger execution
sql("DELETE FROM %s WHERE false", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware")),
sql("SELECT * FROM %s ORDER BY id", tableName));
// should keep all rows and don't trigger execution
sql("DELETE FROM %s WHERE 50 <> 50", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware")),
sql("SELECT * FROM %s ORDER BY id", tableName));
// should keep all rows and don't trigger execution
sql("DELETE FROM %s WHERE 1 > null", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware")),
sql("SELECT * FROM %s ORDER BY id", tableName));
// should remove all rows
sql("DELETE FROM %s WHERE 21 = 21", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(),
sql("SELECT * FROM %s ORDER BY id", tableName));
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots()));
}
@Test
public void testDeleteWithNullConditions() {
createAndInitPartitionedTable();
sql("INSERT INTO TABLE %s VALUES (0, null), (1, 'hr'), (2, 'hardware'), (null, 'hr')", tableName);
// should keep all rows as null is never equal to null
sql("DELETE FROM %s WHERE dep = null", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(0, null), row(1, "hr"), row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
// null = 'software' -> null
// should delete using metadata operation only
sql("DELETE FROM %s WHERE dep = 'software'", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(0, null), row(1, "hr"), row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
// should delete using metadata operation only
sql("DELETE FROM %s WHERE dep <=> NULL", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots()));
Snapshot currentSnapshot = table.currentSnapshot();
validateSnapshot(currentSnapshot, "delete", "1", "1", null);
}
@Ignore // TODO: fails due to SPARK-33267
public void testDeleteWithInAndNotInConditions() {
createAndInitUnpartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", tableName);
sql("DELETE FROM %s WHERE id IN (1, null)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
sql("DELETE FROM %s WHERE id NOT IN (null, 1)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
sql("DELETE FROM %s WHERE id NOT IN (1, 10)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
}
@Test
public void testDeleteWithMultipleRowGroupsParquet() throws NoSuchTableException {
Assume.assumeTrue(fileFormat.equalsIgnoreCase("parquet"));
createAndInitPartitionedTable();
sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, PARQUET_ROW_GROUP_SIZE_BYTES, 100);
sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, SPLIT_SIZE, 100);
List<Integer> ids = new ArrayList<>();
for (int id = 1; id <= 200; id++) {
ids.add(id);
}
Dataset<Row> df = spark.createDataset(ids, Encoders.INT())
.withColumnRenamed("value", "id")
.withColumn("dep", lit("hr"));
df.coalesce(1).writeTo(tableName).append();
Assert.assertEquals(200, spark.table(tableName).count());
// delete a record from one of two row groups and copy over the second one
sql("DELETE FROM %s WHERE id IN (200, 201)", tableName);
Assert.assertEquals(199, spark.table(tableName).count());
}
@Test
public void testDeleteWithConditionOnNestedColumn() {
createAndInitNestedColumnsTable();
sql("INSERT INTO TABLE %s VALUES (1, named_struct(\"c1\", 3, \"c2\", \"v1\"))", tableName);
sql("INSERT INTO TABLE %s VALUES (2, named_struct(\"c1\", 2, \"c2\", \"v2\"))", tableName);
sql("DELETE FROM %s WHERE complex.c1 = id + 2", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(2)),
sql("SELECT id FROM %s", tableName));
sql("DELETE FROM %s t WHERE t.complex.c1 = id", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(),
sql("SELECT id FROM %s", tableName));
}
@Test
public void testDeleteWithInSubquery() throws NoSuchTableException {
createAndInitUnpartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", tableName);
createOrReplaceView("deleted_id", Arrays.asList(0, 1, null), Encoders.INT());
createOrReplaceView("deleted_dep", Arrays.asList("software", "hr"), Encoders.STRING());
sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id) AND dep IN (SELECT * from deleted_dep)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
append(new Employee(1, "hr"), new Employee(-1, "hr"));
assertEquals("Should have expected rows",
ImmutableList.of(row(-1, "hr"), row(1, "hr"), row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
sql("DELETE FROM %s WHERE id IS NULL OR id IN (SELECT value + 2 FROM deleted_id)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(-1, "hr"), row(1, "hr")),
sql("SELECT * FROM %s ORDER BY id", tableName));
append(new Employee(null, "hr"), new Employee(2, "hr"));
assertEquals("Should have expected rows",
ImmutableList.of(row(-1, "hr"), row(1, "hr"), row(2, "hr"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
sql("DELETE FROM %s WHERE id IN (SELECT value + 2 FROM deleted_id) AND dep = 'hr'", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(-1, "hr"), row(1, "hr"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
}
@Test
public void testDeleteWithMultiColumnInSubquery() throws NoSuchTableException {
createAndInitUnpartitionedTable();
append(new Employee(1, "hr"), new Employee(2, "hardware"), new Employee(null, "hr"));
List<Employee> deletedEmployees = Arrays.asList(new Employee(null, "hr"), new Employee(1, "hr"));
createOrReplaceView("deleted_employee", deletedEmployees, Encoders.bean(Employee.class));
sql("DELETE FROM %s WHERE (id, dep) IN (SELECT id, dep FROM deleted_employee)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
}
@Ignore // TODO: not supported since SPARK-25154 fix is not yet available
public void testDeleteWithNotInSubquery() throws NoSuchTableException {
createAndInitUnpartitionedTable();
append(new Employee(1, "hr"), new Employee(2, "hardware"), new Employee(null, "hr"));
createOrReplaceView("deleted_id", Arrays.asList(-1, -2, null), Encoders.INT());
createOrReplaceView("deleted_dep", Arrays.asList("software", "hr"), Encoders.STRING());
// the file filter subquery (nested loop lef-anti join) returns 0 records
sql("DELETE FROM %s WHERE id NOT IN (SELECT * FROM deleted_id)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
sql("DELETE FROM %s WHERE id NOT IN (SELECT * FROM deleted_id WHERE value IS NOT NULL)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
sql("DELETE FROM %s WHERE id NOT IN (SELECT * FROM deleted_id) OR dep IN ('software', 'hr')", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(2, "hardware")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
}
@Test
public void testDeleteWithNotInSubqueryNotSupported() throws NoSuchTableException {
createAndInitUnpartitionedTable();
append(new Employee(1, "hr"), new Employee(2, "hardware"));
createOrReplaceView("deleted_id", Arrays.asList(-1, -2, null), Encoders.INT());
AssertHelpers.assertThrows("Should complain about NOT IN subquery",
AnalysisException.class, "Null-aware predicate sub-queries are not currently supported",
() -> sql("DELETE FROM %s WHERE id NOT IN (SELECT * FROM deleted_id)", tableName));
}
@Test
public void testDeleteOnNonIcebergTableNotSupported() throws NoSuchTableException {
createOrReplaceView("testtable", "{ \"c1\": -100, \"c2\": -200 }");
AssertHelpers.assertThrows("Delete is not supported for non iceberg table",
AnalysisException.class, "DELETE is only supported with v2 tables.",
() -> sql("DELETE FROM %s WHERE c1 = -100", "testtable"));
}
@Test
public void testDeleteWithExistSubquery() throws NoSuchTableException {
createAndInitUnpartitionedTable();
append(new Employee(1, "hr"), new Employee(2, "hardware"), new Employee(null, "hr"));
createOrReplaceView("deleted_id", Arrays.asList(-1, -2, null), Encoders.INT());
createOrReplaceView("deleted_dep", Arrays.asList("software", "hr"), Encoders.STRING());
sql("DELETE FROM %s t WHERE EXISTS (SELECT 1 FROM deleted_id d WHERE t.id = d.value)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
sql("DELETE FROM %s t WHERE EXISTS (SELECT 1 FROM deleted_id d WHERE t.id = d.value + 2)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
sql("DELETE FROM %s t WHERE EXISTS (SELECT 1 FROM deleted_id d WHERE t.id = d.value) OR t.id IS NULL", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(2, "hardware")),
sql("SELECT * FROM %s", tableName));
}
@Test
public void testDeleteWithNotExistsSubquery() throws NoSuchTableException {
createAndInitUnpartitionedTable();
append(new Employee(1, "hr"), new Employee(2, "hardware"), new Employee(null, "hr"));
createOrReplaceView("deleted_id", Arrays.asList(-1, -2, null), Encoders.INT());
createOrReplaceView("deleted_dep", Arrays.asList("software", "hr"), Encoders.STRING());
sql("DELETE FROM %s t WHERE NOT EXISTS (SELECT 1 FROM deleted_id d WHERE t.id = d.value + 2)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
String subquery = "SELECT 1 FROM deleted_id d WHERE t.id = d.value + 2";
sql("DELETE FROM %s t WHERE NOT EXISTS (%s) OR t.id = 1", tableName, subquery);
assertEquals("Should have expected rows",
ImmutableList.of(),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
}
@Test
public void testDeleteWithScalarSubquery() throws NoSuchTableException {
createAndInitUnpartitionedTable();
append(new Employee(1, "hr"), new Employee(2, "hardware"), new Employee(null, "hr"));
createOrReplaceView("deleted_id", Arrays.asList(1, 100, null), Encoders.INT());
sql("DELETE FROM %s t WHERE id <= (SELECT min(value) FROM deleted_id)", tableName);
assertEquals("Should have expected rows",
ImmutableList.of(row(2, "hardware"), row(null, "hr")),
sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName));
}
@Test
public void testDeleteThatRequiresGroupingBeforeWrite() throws NoSuchTableException {
createAndInitPartitionedTable();
append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
append(new Employee(0, "ops"), new Employee(1, "ops"), new Employee(2, "ops"));
append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
append(new Employee(0, "ops"), new Employee(1, "ops"), new Employee(2, "ops"));
createOrReplaceView("deleted_id", Arrays.asList(1, 100), Encoders.INT());
String originalNumOfShufflePartitions = spark.conf().get("spark.sql.shuffle.partitions");
try {
// set the num of shuffle partitions to 1 to ensure we have only 1 writing task
spark.conf().set("spark.sql.shuffle.partitions", "1");
sql("DELETE FROM %s t WHERE id IN (SELECT * FROM deleted_id)", tableName);
Assert.assertEquals("Should have expected num of rows", 8L, spark.table(tableName).count());
} finally {
spark.conf().set("spark.sql.shuffle.partitions", originalNumOfShufflePartitions);
}
}
@Test
public synchronized void testDeleteWithSerializableIsolation() throws InterruptedException {
// cannot run tests with concurrency for Hadoop tables without atomic renames
Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
createAndInitUnpartitionedTable();
sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, DELETE_ISOLATION_LEVEL, "serializable");
ExecutorService executorService = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
AtomicInteger barrier = new AtomicInteger(0);
// delete thread
Future<?> deleteFuture = executorService.submit(() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
sql("DELETE FROM %s WHERE id = 1", tableName);
barrier.incrementAndGet();
}
});
// append thread
Future<?> appendFuture = executorService.submit(() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
barrier.incrementAndGet();
}
});
try {
deleteFuture.get();
Assert.fail("Expected a validation exception");
} catch (ExecutionException e) {
Throwable sparkException = e.getCause();
Assert.assertThat(sparkException, CoreMatchers.instanceOf(SparkException.class));
Throwable validationException = sparkException.getCause();
Assert.assertThat(validationException, CoreMatchers.instanceOf(ValidationException.class));
String errMsg = validationException.getMessage();
Assert.assertThat(errMsg, CoreMatchers.containsString("Found conflicting files that can contain"));
} finally {
appendFuture.cancel(true);
}
executorService.shutdown();
Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
}
@Test
public synchronized void testDeleteWithSnapshotIsolation() throws InterruptedException, ExecutionException {
// cannot run tests with concurrency for Hadoop tables without atomic renames
Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
createAndInitUnpartitionedTable();
sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, DELETE_ISOLATION_LEVEL, "snapshot");
ExecutorService executorService = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
AtomicInteger barrier = new AtomicInteger(0);
// delete thread
Future<?> deleteFuture = executorService.submit(() -> {
for (int numOperations = 0; numOperations < 20; numOperations++) {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
sql("DELETE FROM %s WHERE id = 1", tableName);
barrier.incrementAndGet();
}
});
// append thread
Future<?> appendFuture = executorService.submit(() -> {
for (int numOperations = 0; numOperations < 20; numOperations++) {
while (barrier.get() < numOperations * 2) {
sleep(10);
}
sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
barrier.incrementAndGet();
}
});
try {
deleteFuture.get();
} finally {
appendFuture.cancel(true);
}
executorService.shutdown();
Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
}
// TODO: multiple stripes for ORC
protected void validateSnapshot(Snapshot snapshot, String operation, String changedPartitionCount,
String deletedDataFiles, String addedDataFiles) {
Assert.assertEquals("Operation must match", operation, snapshot.operation());
Assert.assertEquals("Changed partitions count must match",
changedPartitionCount,
snapshot.summary().get("changed-partition-count"));
Assert.assertEquals("Deleted data files count must match",
deletedDataFiles,
snapshot.summary().get("deleted-data-files"));
Assert.assertEquals("Added data files count must match",
addedDataFiles,
snapshot.summary().get("added-data-files"));
}
protected void createAndInitPartitionedTable() {
sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (dep)", tableName);
initTable();
}
protected void createAndInitUnpartitionedTable() {
sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tableName);
initTable();
}
protected void createAndInitNestedColumnsTable() {
sql("CREATE TABLE %s (id INT, complex STRUCT<c1:INT,c2:STRING>) USING iceberg", tableName);
initTable();
}
protected <T> void createOrReplaceView(String name, List<T> data, Encoder<T> encoder) {
spark.createDataset(data, encoder).createOrReplaceTempView(name);
}
protected void append(Employee... employees) throws NoSuchTableException {
List<Employee> input = Arrays.asList(employees);
Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
inputDF.coalesce(1).writeTo(tableName).append();
}
protected void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}