Flink: Support SQL IF EXISTS and IF NOT EXISTS clauses (#2135)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 2fd5696..4917e9a 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -308,7 +308,7 @@
     return toCatalogTable(table);
   }
 
-  Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
+  private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
     try {
       Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
       if (cacheEnabled) {
@@ -332,7 +332,9 @@
     try {
       icebergCatalog.dropTable(toIdentifier(tablePath));
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
-      throw new TableNotExistException(getName(), tablePath, e);
+      if (!ignoreIfNotExists) {
+        throw new TableNotExistException(getName(), tablePath, e);
+      }
     }
   }
 
@@ -344,7 +346,9 @@
           toIdentifier(tablePath),
           toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName)));
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
-      throw new TableNotExistException(getName(), tablePath, e);
+      if (!ignoreIfNotExists) {
+        throw new TableNotExistException(getName(), tablePath, e);
+      }
     } catch (AlreadyExistsException e) {
       throw new TableAlreadyExistException(getName(), tablePath, e);
     }
@@ -376,7 +380,9 @@
           location,
           properties.build());
     } catch (AlreadyExistsException e) {
-      throw new TableAlreadyExistException(getName(), tablePath, e);
+      if (!ignoreIfExists) {
+        throw new TableAlreadyExistException(getName(), tablePath, e);
+      }
     }
   }
 
@@ -384,7 +390,18 @@
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
       throws CatalogException, TableNotExistException {
     validateFlinkTable(newTable);
-    Table icebergTable = loadIcebergTable(tablePath);
+
+    Table icebergTable;
+    try {
+      icebergTable = loadIcebergTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw e;
+      } else {
+        return;
+      }
+    }
+
     CatalogTable table = toCatalogTable(icebergTable);
 
     // Currently, Flink SQL only support altering table properties.
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index cbb078e..ced4e66 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -57,6 +57,15 @@
     sql("CREATE DATABASE %s", flinkDatabase);
 
     Assert.assertTrue("Database should exist", validationNamespaceCatalog.namespaceExists(icebergNamespace));
+
+    sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+    Assert.assertTrue("Database should still exist", validationNamespaceCatalog.namespaceExists(icebergNamespace));
+
+    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    Assert.assertFalse("Database should be dropped", validationNamespaceCatalog.namespaceExists(icebergNamespace));
+
+    sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+    Assert.assertTrue("Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace));
   }
 
   @Test
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 724fd11..bf1b091 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -44,6 +44,7 @@
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
@@ -51,6 +52,7 @@
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
@@ -124,6 +126,25 @@
     Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
   }
 
+  @Ignore("Enable this after upgrade flink to 1.12.0, because it starts to support 'CREATE TABLE IF NOT EXISTS")
+  @Test
+  public void testCreateTableIfNotExists() {
+    sql("CREATE TABLE tl(id BIGINT)");
+
+    // Assert that table does exist.
+    Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
+
+    sql("DROP TABLE tl");
+    AssertHelpers.assertThrows("Table 'tl' should be dropped",
+        NoSuchTableException.class, "Table does not exist: db.tl", () -> table("tl"));
+
+    sql("CREATE TABLE IF NO EXISTS tl(id BIGINT)");
+    Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
+
+    sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT) WITH ('location'='/tmp/location')");
+    Assert.assertEquals("Should still be the old table.", Maps.newHashMap(), table("tl").properties());
+  }
+
   @Test
   public void testCreateTableLike() throws TableNotExistException {
     sql("CREATE TABLE tl(id BIGINT)");