blob: bf1b091ddb3dcbc847e0008c14a12e74d1d3a618 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
public TestFlinkCatalogTable(String catalogName, Namespace baseNamepace) {
super(catalogName, baseNamepace);
}
@Before
public void before() {
super.before();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
}
@After
public void cleanNamespaces() {
sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
sql("DROP TABLE IF EXISTS %s.tl2", flinkDatabase);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
super.clean();
}
@Test
public void testGetTable() {
validationCatalog.createTable(
TableIdentifier.of(icebergNamespace, "tl"),
new Schema(
Types.NestedField.optional(0, "id", Types.LongType.get()),
Types.NestedField.optional(1, "strV", Types.StringType.get())));
Assert.assertEquals(
Arrays.asList(
TableColumn.of("id", DataTypes.BIGINT()),
TableColumn.of("strV", DataTypes.STRING())),
getTableEnv().from("tl").getSchema().getTableColumns());
Assert.assertTrue(getTableEnv().getCatalog(catalogName).get().tableExists(ObjectPath.fromString("db.tl")));
}
@Test
public void testRenameTable() {
Assume.assumeFalse("HadoopCatalog does not support rename table", isHadoopCatalog);
validationCatalog.createTable(
TableIdentifier.of(icebergNamespace, "tl"),
new Schema(Types.NestedField.optional(0, "id", Types.LongType.get())));
sql("ALTER TABLE tl RENAME TO tl2");
AssertHelpers.assertThrows(
"Should fail if trying to get a nonexistent table",
ValidationException.class,
"Table `tl` was not found.",
() -> getTableEnv().from("tl")
);
Assert.assertEquals(
Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())),
getTableEnv().from("tl2").getSchema().getTableColumns());
}
@Test
public void testCreateTable() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");
Table table = table("tl");
Assert.assertEquals(
new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals(Maps.newHashMap(), table.properties());
CatalogTable catalogTable = catalogTable("tl");
Assert.assertEquals(TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
}
@Ignore("Enable this after upgrade flink to 1.12.0, because it starts to support 'CREATE TABLE IF NOT EXISTS")
@Test
public void testCreateTableIfNotExists() {
sql("CREATE TABLE tl(id BIGINT)");
// Assert that table does exist.
Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
sql("DROP TABLE tl");
AssertHelpers.assertThrows("Table 'tl' should be dropped",
NoSuchTableException.class, "Table does not exist: db.tl", () -> table("tl"));
sql("CREATE TABLE IF NO EXISTS tl(id BIGINT)");
Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT) WITH ('location'='/tmp/location')");
Assert.assertEquals("Should still be the old table.", Maps.newHashMap(), table("tl").properties());
}
@Test
public void testCreateTableLike() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");
sql("CREATE TABLE tl2 LIKE tl");
Table table = table("tl2");
Assert.assertEquals(
new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals(Maps.newHashMap(), table.properties());
CatalogTable catalogTable = catalogTable("tl2");
Assert.assertEquals(TableSchema.builder().field("id", DataTypes.BIGINT()).build(), catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
}
@Test
public void testCreateTableLocation() {
Assume.assumeFalse("HadoopCatalog does not support creating table with location", isHadoopCatalog);
sql("CREATE TABLE tl(id BIGINT) WITH ('location'='/tmp/location')");
Table table = table("tl");
Assert.assertEquals(
new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals("/tmp/location", table.location());
Assert.assertEquals(Maps.newHashMap(), table.properties());
}
@Test
public void testCreatePartitionTable() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT, dt STRING) PARTITIONED BY(dt)");
Table table = table("tl");
Assert.assertEquals(
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "dt", Types.StringType.get())).asStruct(),
table.schema().asStruct());
Assert.assertEquals(PartitionSpec.builderFor(table.schema()).identity("dt").build(), table.spec());
Assert.assertEquals(Maps.newHashMap(), table.properties());
CatalogTable catalogTable = catalogTable("tl");
Assert.assertEquals(
TableSchema.builder().field("id", DataTypes.BIGINT()).field("dt", DataTypes.STRING()).build(),
catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
Assert.assertEquals(Collections.singletonList("dt"), catalogTable.getPartitionKeys());
}
@Test
public void testLoadTransformPartitionTable() throws TableNotExistException {
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
validationCatalog.createTable(
TableIdentifier.of(icebergNamespace, "tl"), schema,
PartitionSpec.builderFor(schema).bucket("id", 100).build());
CatalogTable catalogTable = catalogTable("tl");
Assert.assertEquals(
TableSchema.builder().field("id", DataTypes.BIGINT()).build(),
catalogTable.getSchema());
Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
Assert.assertEquals(Collections.emptyList(), catalogTable.getPartitionKeys());
}
@Test
public void testAlterTable() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')");
Map<String, String> properties = Maps.newHashMap();
properties.put("oldK", "oldV");
// new
sql("ALTER TABLE tl SET('newK'='newV')");
properties.put("newK", "newV");
Assert.assertEquals(properties, table("tl").properties());
// update old
sql("ALTER TABLE tl SET('oldK'='oldV2')");
properties.put("oldK", "oldV2");
Assert.assertEquals(properties, table("tl").properties());
// remove property
CatalogTable catalogTable = catalogTable("tl");
properties.remove("oldK");
getTableEnv().getCatalog(getTableEnv().getCurrentCatalog()).get().alterTable(
new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false);
Assert.assertEquals(properties, table("tl").properties());
}
@Test
public void testRelocateTable() {
Assume.assumeFalse("HadoopCatalog does not support relocate table", isHadoopCatalog);
sql("CREATE TABLE tl(id BIGINT)");
sql("ALTER TABLE tl SET('location'='/tmp/location')");
Assert.assertEquals("/tmp/location", table("tl").location());
}
@Test
public void testSetCurrentAndCherryPickSnapshotId() {
sql("CREATE TABLE tl(c1 INT, c2 STRING, c3 STRING) PARTITIONED BY (c1)");
Table table = table("tl");
DataFile fileA = DataFiles.builder(table.spec())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=0") // easy way to set partition data for now
.withRecordCount(1)
.build();
DataFile fileB = DataFiles.builder(table.spec())
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=1") // easy way to set partition data for now
.withRecordCount(1)
.build();
DataFile replacementFile = DataFiles.builder(table.spec())
.withPath("/path/to/data-a-replacement.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("c1=0") // easy way to set partition data for now
.withRecordCount(1)
.build();
table.newAppend()
.appendFile(fileA)
.commit();
long snapshotId = table.currentSnapshot().snapshotId();
// stage an overwrite that replaces FILE_A
table.newReplacePartitions()
.addFile(replacementFile)
.stageOnly()
.commit();
Snapshot staged = Iterables.getLast(table.snapshots());
Assert.assertEquals("Should find the staged overwrite snapshot", DataOperations.OVERWRITE, staged.operation());
// add another append so that the original commit can't be fast-forwarded
table.newAppend()
.appendFile(fileB)
.commit();
// test cherry pick
sql("ALTER TABLE tl SET('cherry-pick-snapshot-id'='%s')", staged.snapshotId());
validateTableFiles(table, fileB, replacementFile);
// test set current snapshot
sql("ALTER TABLE tl SET('current-snapshot-id'='%s')", snapshotId);
validateTableFiles(table, fileA);
}
private void validateTableFiles(Table tbl, DataFile... expectedFiles) {
tbl.refresh();
Set<CharSequence> expectedFilePaths = Arrays.stream(expectedFiles).map(DataFile::path).collect(Collectors.toSet());
Set<CharSequence> actualFilePaths = StreamSupport.stream(tbl.newScan().planFiles().spliterator(), false)
.map(FileScanTask::file).map(ContentFile::path)
.collect(Collectors.toSet());
Assert.assertEquals("Files should match", expectedFilePaths, actualFilePaths);
}
private Table table(String name) {
return validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, name));
}
private CatalogTable catalogTable(String name) throws TableNotExistException {
return (CatalogTable) getTableEnv().getCatalog(getTableEnv().getCurrentCatalog()).get()
.getTable(new ObjectPath(DATABASE, name));
}
}