IMPALA-9122 : Ignore FileNotFoundException when loading a table

It is possible that when the file metadata of a table or partition is
being loaded, some temporary files (like the ones in .hive-staging
directory) are deleted by external engines like Hive. This causes a
FileNotFoundException during the load and it fails the reload command.
In general, this should not be a problem since users are careful not to
modify the table from Hive or Spark while Impala is reading them. In
the worst case, currently the refresh command fails which can be
retried by the user. However, this does not go well with when event
processing is turned on. EventProcessor tries to reload the table as
soon as it sees a INSERT_EVENT from metastore. Hive may be still
cleaning up the staging directories when EventProcessor issues a reload
causing it go in error state.

Ideally, we should have some sort of intra-engine synchronization
semantics to avoid such issues, but that is much more complex
architectural change. For now, we should ignore such errors and skip
the deleted file from being loaded.

Testing: Unfortunately, this error is hard to reproduce locally. I
tried creating multiple threads which delete some files while multiple
FileMetadataLoaders are loading concurrently but it didn't fail for me.
Ran TestEventProcessing.test_insert_events in a loop for more than an
hour and didn't see any failure.

Change-Id: Iecf6b193b0d57de27d41ad6ef6e1719005d9e908
Reviewed-on: http://gerrit.cloudera.org:8080/14806
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index f2de972..7eccd13 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -676,9 +676,20 @@
       // we don't need to do anything (extra calls to hasNext() must not affect
       // state)
       while (curFile_ == null) {
-        if (!baseIterator_.hasNext()) return false;
-        // if the next fileStatus is in ignored directory skip it
-        FileStatus next = baseIterator_.next();
+        FileStatus next;
+        try {
+          if (!baseIterator_.hasNext()) return false;
+          // if the next fileStatus is in ignored directory skip it
+           next = baseIterator_.next();
+        } catch (FileNotFoundException ex) {
+          // in case of concurrent operations by multiple engines it is possible that
+          // some temporary files are deleted while Impala is loading the table. For
+          // instance, hive deletes the temporary files in the .hive-staging directory
+          // after an insert query from Hive completes. If we are loading the table at
+          // the same time, we may get a FileNotFoundException which is safe to ignore.
+          LOG.warn(ex.getMessage());
+          continue;
+        }
         if (!isInIgnoredDirectory(startPath_, next)) {
           curFile_ = next;
           return true;
@@ -702,13 +713,13 @@
    * Iterator which recursively visits directories on a FileSystem, yielding
    * files in an unspecified order.
    */
-  static class RecursingIterator implements RemoteIterator<FileStatus> {
+  private static class RecursingIterator implements RemoteIterator<FileStatus> {
     private final FileSystem fs_;
     private final Stack<RemoteIterator<FileStatus>> iters_ = new Stack<>();
     private RemoteIterator<FileStatus> curIter_;
     private FileStatus curFile_;
 
-    RecursingIterator(FileSystem fs, Path startPath) throws IOException {
+    private RecursingIterator(FileSystem fs, Path startPath) throws IOException {
       this.fs_ = Preconditions.checkNotNull(fs);
       curIter_ = fs.listStatusIterator(Preconditions.checkNotNull(startPath));
     }
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 432d421..0b7103b 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -69,7 +69,7 @@
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
   @SkipIfHive2.acid
-  def test_insert_events_transactional(self):
+  def test_transactional_insert_events(self):
     """Executes 'run_test_insert_events' for transactional tables.
     """
     self.run_test_insert_events(is_transactional=True)