Flink: Support SQL IF EXISTS and IF NOT EXISTS clauses (#2135)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 2fd5696..4917e9a 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -308,7 +308,7 @@
return toCatalogTable(table);
}
- Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
+ private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
try {
Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
if (cacheEnabled) {
@@ -332,7 +332,9 @@
try {
icebergCatalog.dropTable(toIdentifier(tablePath));
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
- throw new TableNotExistException(getName(), tablePath, e);
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
}
}
@@ -344,7 +346,9 @@
toIdentifier(tablePath),
toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName)));
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
- throw new TableNotExistException(getName(), tablePath, e);
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistException(getName(), tablePath, e);
}
@@ -376,7 +380,9 @@
location,
properties.build());
} catch (AlreadyExistsException e) {
- throw new TableAlreadyExistException(getName(), tablePath, e);
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
}
}
@@ -384,7 +390,18 @@
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws CatalogException, TableNotExistException {
validateFlinkTable(newTable);
- Table icebergTable = loadIcebergTable(tablePath);
+
+ Table icebergTable;
+ try {
+ icebergTable = loadIcebergTable(tablePath);
+ } catch (TableNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw e;
+ } else {
+ return;
+ }
+ }
+
CatalogTable table = toCatalogTable(icebergTable);
// Currently, Flink SQL only support altering table properties.
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index cbb078e..ced4e66 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -57,6 +57,15 @@
sql("CREATE DATABASE %s", flinkDatabase);
Assert.assertTrue("Database should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace));
+
+ sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+ Assert.assertTrue("Database should still exist", validationNamespaceCatalog.namespaceExists(icebergNamespace));
+
+ sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ Assert.assertFalse("Database should be dropped", validationNamespaceCatalog.namespaceExists(icebergNamespace));
+
+ sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+ Assert.assertTrue("Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace));
}
@Test
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 724fd11..bf1b091 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -44,6 +44,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
@@ -51,6 +52,7 @@
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
@@ -124,6 +126,25 @@
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
}
+ @Ignore("Enable this after upgrade flink to 1.12.0, because it starts to support 'CREATE TABLE IF NOT EXISTS")
+ @Test
+ public void testCreateTableIfNotExists() {
+ sql("CREATE TABLE tl(id BIGINT)");
+
+ // Assert that table does exist.
+ Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
+
+ sql("DROP TABLE tl");
+ AssertHelpers.assertThrows("Table 'tl' should be dropped",
+ NoSuchTableException.class, "Table does not exist: db.tl", () -> table("tl"));
+
+ sql("CREATE TABLE IF NO EXISTS tl(id BIGINT)");
+ Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
+
+ sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT) WITH ('location'='/tmp/location')");
+ Assert.assertEquals("Should still be the old table.", Maps.newHashMap(), table("tl").properties());
+ }
+
@Test
public void testCreateTableLike() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");