blob: 3cff1574aac1685c255292d1a1332f79a96a63a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.hadoop;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.iceberg.NullOrder.NULLS_FIRST;
import static org.apache.iceberg.SortDirection.ASC;
public class TestHadoopCatalog extends HadoopTableTestBase {
private static ImmutableMap<String, String> meta = ImmutableMap.of();
@Test
public void testCreateTableBuilder() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
Table table = catalog.buildTable(tableIdent, SCHEMA)
.withPartitionSpec(SPEC)
.withProperties(null)
.withProperty("key1", "value1")
.withProperties(ImmutableMap.of("key2", "value2"))
.create();
Assert.assertEquals(TABLE_SCHEMA.toString(), table.schema().toString());
Assert.assertEquals(1, table.spec().fields().size());
Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
}
@Test
public void testCreateTableTxnBuilder() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
Transaction txn = catalog.buildTable(tableIdent, SCHEMA)
.withPartitionSpec(null)
.createTransaction();
txn.commitTransaction();
Table table = catalog.loadTable(tableIdent);
Assert.assertEquals(TABLE_SCHEMA.toString(), table.schema().toString());
Assert.assertTrue(table.spec().isUnpartitioned());
}
@Test
public void testReplaceTxnBuilder() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
Transaction createTxn = catalog.buildTable(tableIdent, SCHEMA)
.withPartitionSpec(SPEC)
.withProperty("key1", "value1")
.createOrReplaceTransaction();
createTxn.newAppend()
.appendFile(FILE_A)
.commit();
createTxn.commitTransaction();
Table table = catalog.loadTable(tableIdent);
Assert.assertNotNull(table.currentSnapshot());
Transaction replaceTxn = catalog.buildTable(tableIdent, SCHEMA)
.withProperty("key2", "value2")
.replaceTransaction();
replaceTxn.commitTransaction();
table = catalog.loadTable(tableIdent);
Assert.assertNull(table.currentSnapshot());
Assert.assertTrue(table.spec().isUnpartitioned());
Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
}
@Test
public void testTableBuilderWithLocation() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
AssertHelpers.assertThrows("Should reject a custom location",
IllegalArgumentException.class, "Cannot set a custom location for a path-based table",
() -> catalog.buildTable(tableIdent, SCHEMA).withLocation("custom").create());
AssertHelpers.assertThrows("Should reject a custom location",
IllegalArgumentException.class, "Cannot set a custom location for a path-based table",
() -> catalog.buildTable(tableIdent, SCHEMA).withLocation("custom").createTransaction());
AssertHelpers.assertThrows("Should reject a custom location",
IllegalArgumentException.class, "Cannot set a custom location for a path-based table",
() -> catalog.buildTable(tableIdent, SCHEMA).withLocation("custom").createOrReplaceTransaction());
}
@Test
public void testCreateTableDefaultSortOrder() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
Table table = catalog.createTable(tableIdent, SCHEMA, SPEC);
SortOrder sortOrder = table.sortOrder();
Assert.assertEquals("Order ID must match", 0, sortOrder.orderId());
Assert.assertTrue("Order must unsorted", sortOrder.isUnsorted());
}
@Test
public void testCreateTableCustomSortOrder() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
SortOrder order = SortOrder.builderFor(SCHEMA)
.asc("id", NULLS_FIRST)
.build();
Table table = catalog.buildTable(tableIdent, SCHEMA)
.withPartitionSpec(SPEC)
.withSortOrder(order)
.create();
SortOrder sortOrder = table.sortOrder();
Assert.assertEquals("Order ID must match", 1, sortOrder.orderId());
Assert.assertEquals("Order must have 1 field", 1, sortOrder.fields().size());
Assert.assertEquals("Direction must match ", ASC, sortOrder.fields().get(0).direction());
Assert.assertEquals("Null order must match ", NULLS_FIRST, sortOrder.fields().get(0).nullOrder());
Transform<?, ?> transform = Transforms.identity(Types.IntegerType.get());
Assert.assertEquals("Transform must match", transform, sortOrder.fields().get(0).transform());
}
@Test
public void testBasicCatalog() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier testTable = TableIdentifier.of("db", "ns1", "ns2", "tbl");
catalog.createTable(testTable, SCHEMA, PartitionSpec.unpartitioned());
String metaLocation = catalog.defaultWarehouseLocation(testTable);
FileSystem fs = Util.getFs(new Path(metaLocation), conf);
Assert.assertTrue(fs.isDirectory(new Path(metaLocation)));
catalog.dropTable(testTable);
Assert.assertFalse(fs.isDirectory(new Path(metaLocation)));
}
@Test
public void testCreateAndDropTableWithoutNamespace() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier testTable = TableIdentifier.of("tbl");
Table table = catalog.createTable(testTable, SCHEMA, PartitionSpec.unpartitioned());
Assert.assertEquals(table.schema().toString(), TABLE_SCHEMA.toString());
Assert.assertEquals("hadoop.tbl", table.toString());
String metaLocation = catalog.defaultWarehouseLocation(testTable);
FileSystem fs = Util.getFs(new Path(metaLocation), conf);
Assert.assertTrue(fs.isDirectory(new Path(metaLocation)));
catalog.dropTable(testTable);
Assert.assertFalse(fs.isDirectory(new Path(metaLocation)));
}
@Test
public void testDropTable() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier testTable = TableIdentifier.of("db", "ns1", "ns2", "tbl");
catalog.createTable(testTable, SCHEMA, PartitionSpec.unpartitioned());
String metaLocation = catalog.defaultWarehouseLocation(testTable);
FileSystem fs = Util.getFs(new Path(metaLocation), conf);
Assert.assertTrue(fs.isDirectory(new Path(metaLocation)));
catalog.dropTable(testTable);
Assert.assertFalse(fs.isDirectory(new Path(metaLocation)));
}
@Test
public void testRenameTable() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier testTable = TableIdentifier.of("db", "tbl1");
catalog.createTable(testTable, SCHEMA, PartitionSpec.unpartitioned());
AssertHelpers.assertThrows("should throw exception", UnsupportedOperationException.class,
"Cannot rename Hadoop tables", () -> {
catalog.renameTable(testTable, TableIdentifier.of("db", "tbl2"));
}
);
}
@Test
public void testListTables() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tbl1 = TableIdentifier.of("db", "tbl1");
TableIdentifier tbl2 = TableIdentifier.of("db", "tbl2");
TableIdentifier tbl3 = TableIdentifier.of("db", "ns1", "tbl3");
TableIdentifier tbl4 = TableIdentifier.of("db", "metadata", "metadata");
Lists.newArrayList(tbl1, tbl2, tbl3, tbl4).forEach(t ->
catalog.createTable(t, SCHEMA, PartitionSpec.unpartitioned())
);
List<TableIdentifier> tbls1 = catalog.listTables(Namespace.of("db"));
Set<String> tblSet = Sets.newHashSet(tbls1.stream().map(t -> t.name()).iterator());
Assert.assertEquals(tblSet.size(), 2);
Assert.assertTrue(tblSet.contains("tbl1"));
Assert.assertTrue(tblSet.contains("tbl2"));
List<TableIdentifier> tbls2 = catalog.listTables(Namespace.of("db", "ns1"));
Assert.assertEquals(tbls2.size(), 1);
Assert.assertTrue(tbls2.get(0).name().equals("tbl3"));
AssertHelpers.assertThrows("should throw exception", NoSuchNamespaceException.class,
"Namespace does not exist: ", () -> {
catalog.listTables(Namespace.of("db", "ns1", "ns2"));
});
}
@Test
public void testCallingLocationProviderWhenNoCurrentMetadata() throws IOException {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tableIdent = TableIdentifier.of("ns1", "ns2", "table1");
Transaction create = catalog.newCreateTableTransaction(tableIdent, SCHEMA);
create.table().locationProvider(); // NPE triggered if not handled appropriately
create.commitTransaction();
Assert.assertEquals("1 table expected", 1, catalog.listTables(Namespace.of("ns1", "ns2")).size());
catalog.dropTable(tableIdent, true);
}
@Test
public void testCreateNamespace() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tbl1 = TableIdentifier.of("db", "ns1", "ns2", "metadata");
TableIdentifier tbl2 = TableIdentifier.of("db", "ns2", "ns3", "tbl2");
Lists.newArrayList(tbl1, tbl2).forEach(t ->
catalog.createNamespace(t.namespace(), meta)
);
String metaLocation1 = warehousePath + "/" + "db/ns1/ns2";
FileSystem fs1 = Util.getFs(new Path(metaLocation1), conf);
Assert.assertTrue(fs1.isDirectory(new Path(metaLocation1)));
String metaLocation2 = warehousePath + "/" + "db/ns2/ns3";
FileSystem fs2 = Util.getFs(new Path(metaLocation2), conf);
Assert.assertTrue(fs2.isDirectory(new Path(metaLocation2)));
AssertHelpers.assertThrows("Should fail to create when namespace already exist: " + tbl1.namespace(),
org.apache.iceberg.exceptions.AlreadyExistsException.class,
"Namespace already exists: " + tbl1.namespace(), () -> {
catalog.createNamespace(tbl1.namespace());
});
}
@Test
public void testListNamespace() throws Exception {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tbl1 = TableIdentifier.of("db", "ns1", "ns2", "metadata");
TableIdentifier tbl2 = TableIdentifier.of("db", "ns2", "ns3", "tbl2");
TableIdentifier tbl3 = TableIdentifier.of("db", "ns3", "tbl4");
TableIdentifier tbl4 = TableIdentifier.of("db", "metadata");
TableIdentifier tbl5 = TableIdentifier.of("db2", "metadata");
Lists.newArrayList(tbl1, tbl2, tbl3, tbl4, tbl5).forEach(t ->
catalog.createTable(t, SCHEMA, PartitionSpec.unpartitioned())
);
List<Namespace> nsp1 = catalog.listNamespaces(Namespace.of("db"));
Set<String> tblSet = Sets.newHashSet(nsp1.stream().map(t -> t.toString()).iterator());
Assert.assertEquals(tblSet.size(), 3);
Assert.assertTrue(tblSet.contains("db.ns1"));
Assert.assertTrue(tblSet.contains("db.ns2"));
Assert.assertTrue(tblSet.contains("db.ns3"));
List<Namespace> nsp2 = catalog.listNamespaces(Namespace.of("db", "ns1"));
Assert.assertEquals(nsp2.size(), 1);
Assert.assertTrue(nsp2.get(0).toString().equals("db.ns1.ns2"));
List<Namespace> nsp3 = catalog.listNamespaces();
Set<String> tblSet2 = Sets.newHashSet(nsp3.stream().map(t -> t.toString()).iterator());
Assert.assertEquals(tblSet2.size(), 2);
Assert.assertTrue(tblSet2.contains("db"));
Assert.assertTrue(tblSet2.contains("db2"));
List<Namespace> nsp4 = catalog.listNamespaces();
Set<String> tblSet3 = Sets.newHashSet(nsp4.stream().map(t -> t.toString()).iterator());
Assert.assertEquals(tblSet3.size(), 2);
Assert.assertTrue(tblSet3.contains("db"));
Assert.assertTrue(tblSet3.contains("db2"));
AssertHelpers.assertThrows("Should fail to list namespace doesn't exist", NoSuchNamespaceException.class,
"Namespace does not exist: ", () -> {
catalog.listNamespaces(Namespace.of("db", "db2", "ns2"));
});
}
@Test
public void testLoadNamespaceMeta() throws IOException {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tbl1 = TableIdentifier.of("db", "ns1", "ns2", "metadata");
TableIdentifier tbl2 = TableIdentifier.of("db", "ns2", "ns3", "tbl2");
TableIdentifier tbl3 = TableIdentifier.of("db", "ns3", "tbl4");
TableIdentifier tbl4 = TableIdentifier.of("db", "metadata");
Lists.newArrayList(tbl1, tbl2, tbl3, tbl4).forEach(t ->
catalog.createTable(t, SCHEMA, PartitionSpec.unpartitioned())
);
catalog.loadNamespaceMetadata(Namespace.of("db"));
AssertHelpers.assertThrows("Should fail to load namespace doesn't exist", NoSuchNamespaceException.class,
"Namespace does not exist: ", () -> {
catalog.loadNamespaceMetadata(Namespace.of("db", "db2", "ns2"));
});
}
@Test
public void testNamespaceExists() throws IOException {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
TableIdentifier tbl1 = TableIdentifier.of("db", "ns1", "ns2", "metadata");
TableIdentifier tbl2 = TableIdentifier.of("db", "ns2", "ns3", "tbl2");
TableIdentifier tbl3 = TableIdentifier.of("db", "ns3", "tbl4");
TableIdentifier tbl4 = TableIdentifier.of("db", "metadata");
Lists.newArrayList(tbl1, tbl2, tbl3, tbl4).forEach(t ->
catalog.createTable(t, SCHEMA, PartitionSpec.unpartitioned())
);
Assert.assertTrue("Should true to namespace exist",
catalog.namespaceExists(Namespace.of("db", "ns1", "ns2")));
Assert.assertTrue("Should false to namespace doesn't exist",
!catalog.namespaceExists(Namespace.of("db", "db2", "ns2")));
}
@Test
public void testAlterNamespaceMeta() throws IOException {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
AssertHelpers.assertThrows("Should fail to change namespace", UnsupportedOperationException.class,
"Cannot set namespace properties db.db2.ns2 : setProperties is not supported", () -> {
catalog.setProperties(Namespace.of("db", "db2", "ns2"), ImmutableMap.of("property", "test"));
});
}
@Test
public void testDropNamespace() throws IOException {
Configuration conf = new Configuration();
String warehousePath = temp.newFolder().getAbsolutePath();
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
Namespace namespace1 = Namespace.of("db");
Namespace namespace2 = Namespace.of("db", "ns1");
TableIdentifier tbl1 = TableIdentifier.of(namespace1, "tbl1");
TableIdentifier tbl2 = TableIdentifier.of(namespace2, "tbl1");
Lists.newArrayList(tbl1, tbl2).forEach(t ->
catalog.createTable(t, SCHEMA, PartitionSpec.unpartitioned())
);
AssertHelpers.assertThrows("Should fail to drop namespace is not empty " + namespace1,
NamespaceNotEmptyException.class,
"Namespace " + namespace1 + " is not empty.", () -> {
catalog.dropNamespace(Namespace.of("db"));
});
Assert.assertFalse("Should fail to drop namespace doesn't exist",
catalog.dropNamespace(Namespace.of("db2")));
Assert.assertTrue(catalog.dropTable(tbl1));
Assert.assertTrue(catalog.dropTable(tbl2));
Assert.assertTrue(catalog.dropNamespace(namespace2));
Assert.assertTrue(catalog.dropNamespace(namespace1));
String metaLocation = warehousePath + "/" + "db";
FileSystem fs = Util.getFs(new Path(metaLocation), conf);
Assert.assertFalse(fs.isDirectory(new Path(metaLocation)));
}
@Test
public void testVersionHintFileErrorWithFile() throws Exception {
addVersionsToTable(table);
HadoopTableOperations tableOperations = (HadoopTableOperations) TABLES.newTableOps(tableLocation);
long secondSnapshotId = table.currentSnapshot().snapshotId();
// Write old data to confirm that we are writing the correct file
FileIO io = table.io();
io.deleteFile(versionHintFile.getPath());
try (PositionOutputStream stream = io.newOutputFile(versionHintFile.getPath()).create()) {
stream.write("1".getBytes(StandardCharsets.UTF_8));
}
// Check the result of the findVersion(), and load the table and check the current snapshotId
Assert.assertEquals(1, tableOperations.findVersion());
Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
// Write newer data to confirm that we are writing the correct file
io.deleteFile(versionHintFile.getPath());
try (PositionOutputStream stream = io.newOutputFile(versionHintFile.getPath()).create()) {
stream.write("3".getBytes(StandardCharsets.UTF_8));
}
// Check the result of the findVersion(), and load the table and check the current snapshotId
Assert.assertEquals(3, tableOperations.findVersion());
Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
// Write an empty version hint file
io.deleteFile(versionHintFile.getPath());
io.newOutputFile(versionHintFile.getPath()).create().close();
// Check the result of the findVersion(), and load the table and check the current snapshotId
Assert.assertEquals(3, tableOperations.findVersion());
Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
// Just delete the file
io.deleteFile(versionHintFile.getPath());
// Check the result of the versionHint(), and load the table and check the current snapshotId
Assert.assertEquals(3, tableOperations.findVersion());
Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
}
@Test
public void testVersionHintFileMissingMetadata() throws Exception {
addVersionsToTable(table);
HadoopTableOperations tableOperations = (HadoopTableOperations) TABLES.newTableOps(tableLocation);
long secondSnapshotId = table.currentSnapshot().snapshotId();
// Write old data to confirm that we are writing the correct file
FileIO io = table.io();
io.deleteFile(versionHintFile.getPath());
// Remove the first version file, and see if we can recover
io.deleteFile(tableOperations.getMetadataFile(1).toString());
// Check the result of the findVersion(), and load the table and check the current snapshotId
Assert.assertEquals(3, tableOperations.findVersion());
Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
// Remove all the version files, and see if we can recover. Hint... not :)
io.deleteFile(tableOperations.getMetadataFile(2).toString());
io.deleteFile(tableOperations.getMetadataFile(3).toString());
// Check that we got 0 findVersion, and a NoSuchTableException is thrown when trying to load the table
Assert.assertEquals(0, tableOperations.findVersion());
AssertHelpers.assertThrows(
"Should not be able to find the table",
NoSuchTableException.class,
"Table does not exist",
() -> TABLES.load(tableLocation));
}
private static void addVersionsToTable(Table table) {
DataFile dataFile1 = DataFiles.builder(SPEC)
.withPath("/a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
DataFile dataFile2 = DataFiles.builder(SPEC)
.withPath("/b.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
table.newAppend().appendFile(dataFile1).commit();
table.newAppend().appendFile(dataFile2).commit();
}
}