Hadoop: Enhance version-hint.txt recovery with file listing (#1465)
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 3add626..2c33dbe 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -25,8 +25,11 @@
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.LocationProviders;
@@ -55,6 +58,7 @@
*/
public class HadoopTableOperations implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(HadoopTableOperations.class);
+ private static final Pattern VERSION_PATTERN = Pattern.compile("v([^\\.]*)\\..*");
private final Configuration conf;
private final Path location;
@@ -91,7 +95,7 @@
@Override
public TableMetadata refresh() {
- int ver = version != null ? version : readVersionHint();
+ int ver = version != null ? version : findVersion();
try {
Path metadataFile = getMetadataFile(ver);
if (version == null && metadataFile == null && ver == 0) {
@@ -230,7 +234,8 @@
};
}
- private Path getMetadataFile(int metadataVersion) throws IOException {
+ @VisibleForTesting
+ Path getMetadataFile(int metadataVersion) throws IOException {
for (TableMetadataParser.Codec codec : TableMetadataParser.Codec.values()) {
Path metadataFile = metadataFilePath(metadataVersion, codec);
FileSystem fs = getFileSystem(metadataFile, conf);
@@ -260,7 +265,24 @@
}
private Path metadataPath(String filename) {
- return new Path(new Path(location, "metadata"), filename);
+ return new Path(metadataRoot(), filename);
+ }
+
+ private Path metadataRoot() {
+ return new Path(location, "metadata");
+ }
+
+ private int version(String fileName) {
+ Matcher matcher = VERSION_PATTERN.matcher(fileName);
+ if (!matcher.matches()) {
+ return -1;
+ }
+ String versionNumber = matcher.group(1);
+ try {
+ return Integer.parseInt(versionNumber);
+ } catch (NumberFormatException ne) {
+ return -1;
+ }
}
@VisibleForTesting
@@ -280,7 +302,7 @@
}
@VisibleForTesting
- int readVersionHint() {
+ int findVersion() {
Path versionHintFile = versionHintFile();
FileSystem fs = Util.getFs(versionHintFile, conf);
@@ -289,19 +311,30 @@
return Integer.parseInt(in.readLine().replace("\n", ""));
} catch (Exception e) {
- LOG.warn("Error reading version hint file {}", versionHintFile, e);
try {
- if (getMetadataFile(1) != null) {
- // We just assume corrupted metadata and start to read from the first version file
- return 1;
+ if (fs.exists(metadataRoot())) {
+ LOG.warn("Error reading version hint file {}", versionHintFile, e);
+ } else {
+ LOG.debug("Metadata for table not found in directory {}", metadataRoot(), e);
+ return 0;
}
- } catch (IOException io) {
- // We log this error only on debug level since this is just a problem in recovery path
- LOG.debug("Error trying to recover version-hint.txt data for {}", versionHintFile, e);
- }
- // We just return 0 as not able to recover easily
- return 0;
+ // List the metadata directory to find the version files, and try to recover the max available version
+ FileStatus[] files = fs.listStatus(metadataRoot(), name -> VERSION_PATTERN.matcher(name.getName()).matches());
+ int maxVersion = 0;
+
+ for (FileStatus file : files) {
+ int currentVersion = version(file.getPath().getName());
+ if (currentVersion > maxVersion && getMetadataFile(currentVersion) != null) {
+ maxVersion = currentVersion;
+ }
+ }
+
+ return maxVersion;
+ } catch (IOException io) {
+ LOG.warn("Error trying to recover version-hint.txt data for {}", versionHintFile, e);
+ return 0;
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
index a00d938..a1d0478 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
@@ -36,6 +36,7 @@
import org.apache.iceberg.Tables;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.Pair;
@@ -143,7 +144,8 @@
return new BaseTable(ops, location);
}
- private TableOperations newTableOps(String location) {
+ @VisibleForTesting
+ TableOperations newTableOps(String location) {
if (location.contains(METADATA_JSON)) {
return new StaticTableOperations(location, new HadoopFileIO(conf));
} else {
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java
index 6aafe1e..3cff157 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java
@@ -37,6 +37,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -448,16 +449,83 @@
}
@Test
- public void testVersionHintFile() throws Exception {
- Configuration conf = new Configuration();
- String warehousePath = temp.newFolder().getAbsolutePath();
- HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
+ public void testVersionHintFileErrorWithFile() throws Exception {
+ addVersionsToTable(table);
- // Create a test table with multiple versions
- TableIdentifier tableId = TableIdentifier.of("tbl");
- Table table = catalog.createTable(tableId, SCHEMA, PartitionSpec.unpartitioned());
- HadoopTableOperations tableOperations = (HadoopTableOperations) catalog.newTableOps(tableId);
+ HadoopTableOperations tableOperations = (HadoopTableOperations) TABLES.newTableOps(tableLocation);
+ long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+ // Write old data to confirm that we are writing the correct file
+ FileIO io = table.io();
+ io.deleteFile(versionHintFile.getPath());
+ try (PositionOutputStream stream = io.newOutputFile(versionHintFile.getPath()).create()) {
+ stream.write("1".getBytes(StandardCharsets.UTF_8));
+ }
+
+ // Check the result of the findVersion(), and load the table and check the current snapshotId
+ Assert.assertEquals(1, tableOperations.findVersion());
+ Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+
+ // Write newer data to confirm that we are writing the correct file
+ io.deleteFile(versionHintFile.getPath());
+ try (PositionOutputStream stream = io.newOutputFile(versionHintFile.getPath()).create()) {
+ stream.write("3".getBytes(StandardCharsets.UTF_8));
+ }
+
+ // Check the result of the findVersion(), and load the table and check the current snapshotId
+ Assert.assertEquals(3, tableOperations.findVersion());
+ Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+
+ // Write an empty version hint file
+ io.deleteFile(versionHintFile.getPath());
+ io.newOutputFile(versionHintFile.getPath()).create().close();
+
+ // Check the result of the findVersion(), and load the table and check the current snapshotId
+ Assert.assertEquals(3, tableOperations.findVersion());
+ Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+
+ // Just delete the file
+ io.deleteFile(versionHintFile.getPath());
+
+ // Check the result of the versionHint(), and load the table and check the current snapshotId
+ Assert.assertEquals(3, tableOperations.findVersion());
+ Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+ }
+
+ @Test
+ public void testVersionHintFileMissingMetadata() throws Exception {
+ addVersionsToTable(table);
+
+ HadoopTableOperations tableOperations = (HadoopTableOperations) TABLES.newTableOps(tableLocation);
+
+ long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+ // Write old data to confirm that we are writing the correct file
+ FileIO io = table.io();
+ io.deleteFile(versionHintFile.getPath());
+
+ // Remove the first version file, and see if we can recover
+ io.deleteFile(tableOperations.getMetadataFile(1).toString());
+
+ // Check the result of the findVersion(), and load the table and check the current snapshotId
+ Assert.assertEquals(3, tableOperations.findVersion());
+ Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+
+ // Remove all the version files, and see if we can recover. Hint... not :)
+ io.deleteFile(tableOperations.getMetadataFile(2).toString());
+ io.deleteFile(tableOperations.getMetadataFile(3).toString());
+
+ // Check that we got 0 findVersion, and a NoSuchTableException is thrown when trying to load the table
+ Assert.assertEquals(0, tableOperations.findVersion());
+ AssertHelpers.assertThrows(
+ "Should not be able to find the table",
+ NoSuchTableException.class,
+ "Table does not exist",
+ () -> TABLES.load(tableLocation));
+ }
+
+ private static void addVersionsToTable(Table table) {
DataFile dataFile1 = DataFiles.builder(SPEC)
.withPath("/a.parquet")
.withFileSizeInBytes(10)
@@ -472,45 +540,5 @@
table.newAppend().appendFile(dataFile1).commit();
table.newAppend().appendFile(dataFile2).commit();
- long secondSnapshotId = table.currentSnapshot().snapshotId();
-
- // Get the version-hint.text file location
- String versionHintLocation = tableOperations.versionHintFile().toString();
-
- // Write old data to confirm that we are writing the correct file
- FileIO io = new HadoopFileIO(conf);
- io.deleteFile(versionHintLocation);
- try (PositionOutputStream stream = io.newOutputFile(versionHintLocation).create()) {
- stream.write("1".getBytes(StandardCharsets.UTF_8));
- }
-
- // Check the result of the readVersionHint(), and load the table and check the current snapshotId
- Assert.assertEquals(1, tableOperations.readVersionHint());
- Assert.assertEquals(secondSnapshotId, catalog.loadTable(tableId).currentSnapshot().snapshotId());
-
- // Write newer data to confirm that we are writing the correct file
- io.deleteFile(versionHintLocation);
- try (PositionOutputStream stream = io.newOutputFile(versionHintLocation).create()) {
- stream.write("3".getBytes(StandardCharsets.UTF_8));
- }
-
- // Check the result of the readVersionHint(), and load the table and check the current snapshotId
- Assert.assertEquals(3, tableOperations.readVersionHint());
- Assert.assertEquals(secondSnapshotId, catalog.loadTable(tableId).currentSnapshot().snapshotId());
-
- // Write an empty version hint file
- io.deleteFile(versionHintLocation);
- io.newOutputFile(versionHintLocation).create().close();
-
- // Check the result of the readVersionHint(), and load the table and check the current snapshotId
- Assert.assertEquals(1, tableOperations.readVersionHint());
- Assert.assertEquals(secondSnapshotId, catalog.loadTable(tableId).currentSnapshot().snapshotId());
-
- // Just delete the file - double check that we have manipulated the correct file
- io.deleteFile(versionHintLocation);
-
- // Check the result of the readVersionHint(), and load the table and check the current snapshotId
- Assert.assertEquals(1, tableOperations.readVersionHint());
- Assert.assertEquals(secondSnapshotId, catalog.loadTable(tableId).currentSnapshot().snapshotId());
}
}