blob: 4b63bdea25156ef9cf8fce86678783c27574083d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.nessie;
import com.dremio.nessie.error.NessieConflictException;
import com.dremio.nessie.error.NessieNotFoundException;
import com.dremio.nessie.model.Branch;
import com.dremio.nessie.model.ContentsKey;
import com.dremio.nessie.model.IcebergTable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Files;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
public class TestNessieTable extends BaseTestIceberg {
private static final String BRANCH = "iceberg-table-test";
private static final String DB_NAME = "db";
private static final String TABLE_NAME = "tbl";
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DB_NAME, TABLE_NAME);
private static final ContentsKey KEY = ContentsKey.of(DB_NAME, TABLE_NAME);
private static final Schema schema = new Schema(Types.StructType.of(
required(1, "id", Types.LongType.get())).fields());
private static final Schema altered = new Schema(Types.StructType.of(
required(1, "id", Types.LongType.get()),
optional(2, "data", Types.LongType.get())).fields());
private Path tableLocation;
public TestNessieTable() {
super(BRANCH);
}
@Before
public void beforeEach() throws IOException {
super.beforeEach();
this.tableLocation = new Path(catalog.createTable(TABLE_IDENTIFIER, schema).location());
}
@After
public void afterEach() throws Exception {
// drop the table data
if (tableLocation != null) {
tableLocation.getFileSystem(hadoopConfig).delete(tableLocation, true);
catalog.refresh();
catalog.dropTable(TABLE_IDENTIFIER, false);
}
super.afterEach();
}
private com.dremio.nessie.model.IcebergTable getTable(ContentsKey key) throws NessieNotFoundException {
return client.getContentsApi()
.getContents(key, BRANCH)
.unwrap(IcebergTable.class).get();
}
@Test
public void testCreate() throws NessieNotFoundException, IOException {
// Table should be created in iceberg
// Table should be renamed in iceberg
String tableName = TABLE_IDENTIFIER.name();
Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
// add a column
icebergTable.updateSchema().addColumn("mother", Types.LongType.get()).commit();
IcebergTable table = getTable(KEY);
// check parameters are in expected state
Assert.assertEquals(getTableLocation(tableName),
(temp.getRoot().toURI().toString() + DB_NAME + "/" +
tableName).replace("//",
"/"));
// Only 1 snapshotFile Should exist and no manifests should exist
Assert.assertEquals(2, metadataVersionFiles(tableName).size());
Assert.assertEquals(0, manifestFiles(tableName).size());
}
@Test
public void testRename() {
String renamedTableName = "rename_table_name";
TableIdentifier renameTableIdentifier = TableIdentifier.of(TABLE_IDENTIFIER.namespace(),
renamedTableName);
Table original = catalog.loadTable(TABLE_IDENTIFIER);
catalog.renameTable(TABLE_IDENTIFIER, renameTableIdentifier);
Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER));
Assert.assertTrue(catalog.tableExists(renameTableIdentifier));
Table renamed = catalog.loadTable(renameTableIdentifier);
Assert.assertEquals(original.schema().asStruct(), renamed.schema().asStruct());
Assert.assertEquals(original.spec(), renamed.spec());
Assert.assertEquals(original.location(), renamed.location());
Assert.assertEquals(original.currentSnapshot(), renamed.currentSnapshot());
Assert.assertTrue(catalog.dropTable(renameTableIdentifier));
}
@Test
public void testDrop() {
Assert.assertTrue(catalog.tableExists(TABLE_IDENTIFIER));
Assert.assertTrue(catalog.dropTable(TABLE_IDENTIFIER));
Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER));
}
@Test
public void testDropWithoutPurgeLeavesTableData() throws IOException {
Table table = catalog.loadTable(TABLE_IDENTIFIER);
String fileLocation = addRecordsToFile(table, "file");
DataFile file = DataFiles.builder(table.spec())
.withRecordCount(3)
.withPath(fileLocation)
.withFileSizeInBytes(Files.localInput(fileLocation).getLength())
.build();
table.newAppend().appendFile(file).commit();
String manifestListLocation =
table.currentSnapshot().manifestListLocation().replace("file:", "");
Assert.assertTrue(catalog.dropTable(TABLE_IDENTIFIER, false));
Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER));
Assert.assertTrue(new File(fileLocation).exists());
Assert.assertTrue(new File(manifestListLocation).exists());
}
@Test
public void testDropTable() throws IOException {
Table table = catalog.loadTable(TABLE_IDENTIFIER);
GenericRecordBuilder recordBuilder =
new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test"));
List<GenericData.Record> records = new ArrayList<>();
records.add(recordBuilder.set("id", 1L).build());
records.add(recordBuilder.set("id", 2L).build());
records.add(recordBuilder.set("id", 3L).build());
String location1 = addRecordsToFile(table, "file1");
String location2 = addRecordsToFile(table, "file2");
DataFile file1 = DataFiles.builder(table.spec())
.withRecordCount(3)
.withPath(location1)
.withFileSizeInBytes(Files.localInput(location2).getLength())
.build();
DataFile file2 = DataFiles.builder(table.spec())
.withRecordCount(3)
.withPath(location2)
.withFileSizeInBytes(Files.localInput(location1).getLength())
.build();
// add both data files
table.newAppend().appendFile(file1).appendFile(file2).commit();
// delete file2
table.newDelete().deleteFile(file2.path()).commit();
String manifestListLocation =
table.currentSnapshot().manifestListLocation().replace("file:", "");
List<ManifestFile> manifests = table.currentSnapshot().allManifests();
Assert.assertTrue(catalog.dropTable(TABLE_IDENTIFIER));
Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER));
Assert.assertTrue(new File(location1).exists());
Assert.assertTrue(new File(location2).exists());
Assert.assertTrue(new File(manifestListLocation).exists());
for (ManifestFile manifest : manifests) {
Assert.assertTrue(new File(manifest.path().replace("file:", "")).exists());
}
Assert.assertTrue(new File(
((HasTableOperations) table).operations()
.current()
.metadataFileLocation()
.replace("file:", ""))
.exists());
}
@Test
public void testExistingTableUpdate() {
Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
// add a column
icebergTable.updateSchema().addColumn("data", Types.LongType.get()).commit();
icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
// Only 2 snapshotFile Should exist and no manifests should exist
Assert.assertEquals(2, metadataVersionFiles(TABLE_NAME).size());
Assert.assertEquals(0, manifestFiles(TABLE_NAME).size());
Assert.assertEquals(altered.asStruct(), icebergTable.schema().asStruct());
}
@Test
public void testFailure() throws NessieNotFoundException, NessieConflictException {
Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
Branch branch = (Branch) client.getTreeApi().getReferenceByName(BRANCH);
IcebergTable table = client.getContentsApi().getContents(KEY, BRANCH).unwrap(IcebergTable.class).get();
client.getContentsApi().setContents(KEY, branch.getName(), branch.getHash(), "",
IcebergTable.of("dummytable.metadata.json"));
AssertHelpers.assertThrows("Update schema fails with conflict exception, ref not up to date",
CommitFailedException.class,
() -> icebergTable.updateSchema().addColumn("data", Types.LongType.get()).commit());
}
@Test
public void testListTables() {
List<TableIdentifier> tableIdents = catalog.listTables(TABLE_IDENTIFIER.namespace());
List<TableIdentifier> expectedIdents = tableIdents.stream()
.filter(t -> t.namespace()
.level(0)
.equals(DB_NAME) &&
t.name().equals(TABLE_NAME))
.collect(Collectors.toList());
Assert.assertEquals(1, expectedIdents.size());
Assert.assertTrue(catalog.tableExists(TABLE_IDENTIFIER));
}
private String getTableBasePath(String tableName) {
String databasePath = temp.getRoot().toString() + "/" + DB_NAME;
return Paths.get(databasePath, tableName).toAbsolutePath().toString();
}
protected Path getTableLocationPath(String tableName) {
return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString());
}
protected String getTableLocation(String tableName) {
return getTableLocationPath(tableName).toString();
}
private String metadataLocation(String tableName) {
return Paths.get(getTableBasePath(tableName), "metadata").toString();
}
private List<String> metadataFiles(String tableName) {
return Arrays.stream(new File(metadataLocation(tableName)).listFiles())
.map(File::getAbsolutePath)
.collect(Collectors.toList());
}
protected List<String> metadataVersionFiles(String tableName) {
return filterByExtension(tableName, getFileExtension(TableMetadataParser.Codec.NONE));
}
protected List<String> manifestFiles(String tableName) {
return filterByExtension(tableName, ".avro");
}
private List<String> filterByExtension(String tableName, String extension) {
return metadataFiles(tableName)
.stream()
.filter(f -> f.endsWith(extension))
.collect(Collectors.toList());
}
private static String addRecordsToFile(Table table, String filename) throws IOException {
GenericRecordBuilder recordBuilder =
new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test"));
List<GenericData.Record> records = new ArrayList<>();
records.add(recordBuilder.set("id", 1L).build());
records.add(recordBuilder.set("id", 2L).build());
records.add(recordBuilder.set("id", 3L).build());
String fileLocation = table.location().replace("file:", "") +
String.format("/data/%s.avro", filename);
try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(fileLocation))
.schema(schema)
.named("test")
.build()) {
for (GenericData.Record rec : records) {
writer.add(rec);
}
}
return fileLocation;
}
}