Hadoop: Enhance version-hint.txt recovery with file listing (#1465)

diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 3add626..2c33dbe 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -25,8 +25,11 @@
 import java.nio.charset.StandardCharsets;
 import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.LocationProviders;
@@ -55,6 +58,7 @@
  */
 public class HadoopTableOperations implements TableOperations {
   private static final Logger LOG = LoggerFactory.getLogger(HadoopTableOperations.class);
+  private static final Pattern VERSION_PATTERN = Pattern.compile("v([^\\.]*)\\..*");
 
   private final Configuration conf;
   private final Path location;
@@ -91,7 +95,7 @@
 
   @Override
   public TableMetadata refresh() {
-    int ver = version != null ? version : readVersionHint();
+    int ver = version != null ? version : findVersion();
     try {
       Path metadataFile = getMetadataFile(ver);
       if (version == null && metadataFile == null && ver == 0) {
@@ -230,7 +234,8 @@
     };
   }
 
-  private Path getMetadataFile(int metadataVersion) throws IOException {
+  @VisibleForTesting
+  Path getMetadataFile(int metadataVersion) throws IOException {
     for (TableMetadataParser.Codec codec : TableMetadataParser.Codec.values()) {
       Path metadataFile = metadataFilePath(metadataVersion, codec);
       FileSystem fs = getFileSystem(metadataFile, conf);
@@ -260,7 +265,24 @@
   }
 
   private Path metadataPath(String filename) {
-    return new Path(new Path(location, "metadata"), filename);
+    return new Path(metadataRoot(), filename);
+  }
+
+  private Path metadataRoot() {
+    return new Path(location, "metadata");
+  }
+
+  private int version(String fileName) {
+    Matcher matcher = VERSION_PATTERN.matcher(fileName);
+    if (!matcher.matches()) {
+      return -1;
+    }
+    String versionNumber = matcher.group(1);
+    try {
+      return Integer.parseInt(versionNumber);
+    } catch (NumberFormatException ne) {
+      return -1;
+    }
   }
 
   @VisibleForTesting
@@ -280,7 +302,7 @@
   }
 
   @VisibleForTesting
-  int readVersionHint() {
+  int findVersion() {
     Path versionHintFile = versionHintFile();
     FileSystem fs = Util.getFs(versionHintFile, conf);
 
@@ -289,19 +311,30 @@
       return Integer.parseInt(in.readLine().replace("\n", ""));
 
     } catch (Exception e) {
-      LOG.warn("Error reading version hint file {}", versionHintFile, e);
       try {
-        if (getMetadataFile(1) != null) {
-          // We just assume corrupted metadata and start to read from the first version file
-          return 1;
+        if (fs.exists(metadataRoot())) {
+          LOG.warn("Error reading version hint file {}", versionHintFile, e);
+        } else {
+          LOG.debug("Metadata for table not found in directory {}", metadataRoot(), e);
+          return 0;
         }
-      } catch (IOException io) {
-        // We log this error only on debug level since this is just a problem in recovery path
-        LOG.debug("Error trying to recover version-hint.txt data for {}", versionHintFile, e);
-      }
-      // We just return 0 as not able to recover easily
-      return 0;
 
+        // List the metadata directory to find the version files, and try to recover the max available version
+        FileStatus[] files = fs.listStatus(metadataRoot(), name -> VERSION_PATTERN.matcher(name.getName()).matches());
+        int maxVersion = 0;
+
+        for (FileStatus file : files) {
+          int currentVersion = version(file.getPath().getName());
+          if (currentVersion > maxVersion && getMetadataFile(currentVersion) != null) {
+            maxVersion = currentVersion;
+          }
+        }
+
+        return maxVersion;
+      } catch (IOException io) {
+        LOG.warn("Error trying to recover version-hint.txt data for {}", versionHintFile, e);
+        return 0;
+      }
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
index a00d938..a1d0478 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java
@@ -36,6 +36,7 @@
 import org.apache.iceberg.Tables;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.util.Pair;
@@ -143,7 +144,8 @@
     return new BaseTable(ops, location);
   }
 
-  private TableOperations newTableOps(String location) {
+  @VisibleForTesting
+  TableOperations newTableOps(String location) {
     if (location.contains(METADATA_JSON)) {
       return new StaticTableOperations(location, new HadoopFileIO(conf));
     } else {
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java
index 6aafe1e..3cff157 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java
@@ -37,6 +37,7 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.PositionOutputStream;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -448,16 +449,83 @@
   }
 
   @Test
-  public void testVersionHintFile() throws Exception {
-    Configuration conf = new Configuration();
-    String warehousePath = temp.newFolder().getAbsolutePath();
-    HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
+  public void testVersionHintFileErrorWithFile() throws Exception {
+    addVersionsToTable(table);
 
-    // Create a test table with multiple versions
-    TableIdentifier tableId = TableIdentifier.of("tbl");
-    Table table = catalog.createTable(tableId, SCHEMA, PartitionSpec.unpartitioned());
-    HadoopTableOperations tableOperations = (HadoopTableOperations) catalog.newTableOps(tableId);
+    HadoopTableOperations tableOperations = (HadoopTableOperations) TABLES.newTableOps(tableLocation);
 
+    long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+    // Write old data to confirm that we are writing the correct file
+    FileIO io = table.io();
+    io.deleteFile(versionHintFile.getPath());
+    try (PositionOutputStream stream = io.newOutputFile(versionHintFile.getPath()).create()) {
+      stream.write("1".getBytes(StandardCharsets.UTF_8));
+    }
+
+    // Check the result of the findVersion(), and load the table and check the current snapshotId
+    Assert.assertEquals(1, tableOperations.findVersion());
+    Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+
+    // Write newer data to confirm that we are writing the correct file
+    io.deleteFile(versionHintFile.getPath());
+    try (PositionOutputStream stream = io.newOutputFile(versionHintFile.getPath()).create()) {
+      stream.write("3".getBytes(StandardCharsets.UTF_8));
+    }
+
+    // Check the result of the findVersion(), and load the table and check the current snapshotId
+    Assert.assertEquals(3, tableOperations.findVersion());
+    Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+
+    // Write an empty version hint file
+    io.deleteFile(versionHintFile.getPath());
+    io.newOutputFile(versionHintFile.getPath()).create().close();
+
+    // Check the result of the findVersion(), and load the table and check the current snapshotId
+    Assert.assertEquals(3, tableOperations.findVersion());
+    Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+
+    // Just delete the file
+    io.deleteFile(versionHintFile.getPath());
+
+    // Check the result of the versionHint(), and load the table and check the current snapshotId
+    Assert.assertEquals(3, tableOperations.findVersion());
+    Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+  }
+
+  @Test
+  public void testVersionHintFileMissingMetadata() throws Exception {
+    addVersionsToTable(table);
+
+    HadoopTableOperations tableOperations = (HadoopTableOperations) TABLES.newTableOps(tableLocation);
+
+    long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+    // Write old data to confirm that we are writing the correct file
+    FileIO io = table.io();
+    io.deleteFile(versionHintFile.getPath());
+
+    // Remove the first version file, and see if we can recover
+    io.deleteFile(tableOperations.getMetadataFile(1).toString());
+
+    // Check the result of the findVersion(), and load the table and check the current snapshotId
+    Assert.assertEquals(3, tableOperations.findVersion());
+    Assert.assertEquals(secondSnapshotId, TABLES.load(tableLocation).currentSnapshot().snapshotId());
+
+    // Remove all the version files, and see if we can recover. Hint... not :)
+    io.deleteFile(tableOperations.getMetadataFile(2).toString());
+    io.deleteFile(tableOperations.getMetadataFile(3).toString());
+
+    // Check that we got 0 findVersion, and a NoSuchTableException is thrown when trying to load the table
+    Assert.assertEquals(0, tableOperations.findVersion());
+    AssertHelpers.assertThrows(
+        "Should not be able to find the table",
+        NoSuchTableException.class,
+        "Table does not exist",
+        () -> TABLES.load(tableLocation));
+  }
+
+  private static void addVersionsToTable(Table table) {
     DataFile dataFile1 = DataFiles.builder(SPEC)
         .withPath("/a.parquet")
         .withFileSizeInBytes(10)
@@ -472,45 +540,5 @@
 
     table.newAppend().appendFile(dataFile1).commit();
     table.newAppend().appendFile(dataFile2).commit();
-    long secondSnapshotId = table.currentSnapshot().snapshotId();
-
-    // Get the version-hint.text file location
-    String versionHintLocation = tableOperations.versionHintFile().toString();
-
-    // Write old data to confirm that we are writing the correct file
-    FileIO io = new HadoopFileIO(conf);
-    io.deleteFile(versionHintLocation);
-    try (PositionOutputStream stream = io.newOutputFile(versionHintLocation).create()) {
-      stream.write("1".getBytes(StandardCharsets.UTF_8));
-    }
-
-    // Check the result of the readVersionHint(), and load the table and check the current snapshotId
-    Assert.assertEquals(1, tableOperations.readVersionHint());
-    Assert.assertEquals(secondSnapshotId, catalog.loadTable(tableId).currentSnapshot().snapshotId());
-
-    // Write newer data to confirm that we are writing the correct file
-    io.deleteFile(versionHintLocation);
-    try (PositionOutputStream stream = io.newOutputFile(versionHintLocation).create()) {
-      stream.write("3".getBytes(StandardCharsets.UTF_8));
-    }
-
-    // Check the result of the readVersionHint(), and load the table and check the current snapshotId
-    Assert.assertEquals(3, tableOperations.readVersionHint());
-    Assert.assertEquals(secondSnapshotId, catalog.loadTable(tableId).currentSnapshot().snapshotId());
-
-    // Write an empty version hint file
-    io.deleteFile(versionHintLocation);
-    io.newOutputFile(versionHintLocation).create().close();
-
-    // Check the result of the readVersionHint(), and load the table and check the current snapshotId
-    Assert.assertEquals(1, tableOperations.readVersionHint());
-    Assert.assertEquals(secondSnapshotId, catalog.loadTable(tableId).currentSnapshot().snapshotId());
-
-    // Just delete the file - double check that we have manipulated the correct file
-    io.deleteFile(versionHintLocation);
-
-    // Check the result of the readVersionHint(), and load the table and check the current snapshotId
-    Assert.assertEquals(1, tableOperations.readVersionHint());
-    Assert.assertEquals(secondSnapshotId, catalog.loadTable(tableId).currentSnapshot().snapshotId());
   }
 }