HDFS-17027. RBF: Adds auto-msync support for clients connecting to routers. (#5693)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java
new file mode 100644
index 0000000..e494e52
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ClientGSIContext;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcInvocationHandler;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_DEFAULT;
+
+/**
+ * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation
+ * to support automatic msync-ing when using routers.
+ *
+ * This constructs a wrapper proxy around an internal one, and
+ * injects msync calls when necessary via the InvocationHandler.
+ */
+public class RouterObserverReadProxyProvider<T> extends AbstractNNFailoverProxyProvider<T> {
+  @VisibleForTesting
+  static final Logger LOG = LoggerFactory.getLogger(ObserverReadProxyProvider.class);
+
+  /** Client-side context for syncing with the NameNode server side. */
+  private final AlignmentContext alignmentContext;
+
+  /** The inner proxy provider used for active/standby failover. */
+  private final AbstractNNFailoverProxyProvider<T> innerProxy;
+
+  /** The proxy which redirects the internal one. */
+  private final ProxyInfo<T> wrapperProxy;
+
+  /**
+   * Whether reading from observer is enabled. If this is false, this proxy
+   * will not call msync.
+   */
+  private final boolean observerReadEnabled;
+
+  /**
+   * This adjusts how frequently this proxy provider should auto-msync to the
+   * Active NameNode, automatically performing an msync() call to the active
+   * to fetch the current transaction ID before submitting read requests to
+   * observer nodes. See HDFS-14211 for more description of this feature.
+   * If this is below 0, never auto-msync. If this is 0, perform an msync on
+   * every read operation. If this is above 0, perform an msync after this many
+   * ms have elapsed since the last msync.
+   */
+  private final long autoMsyncPeriodMs;
+
+  /**
+   * The time, in millisecond epoch, that the last msync operation was
+   * performed. This includes any implicit msync (any operation which is
+   * serviced by the Active NameNode).
+   */
+  private volatile long lastMsyncTimeMs = -1;
+
+  public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class<T> xface,
+      HAProxyFactory<T> factory) {
+    this(conf, uri, xface, factory, new IPFailoverProxyProvider<>(conf, uri, xface, factory));
+  }
+
+  @SuppressWarnings("unchecked")
+  public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class<T> xface,
+      HAProxyFactory<T> factory, AbstractNNFailoverProxyProvider<T> failoverProxy) {
+    super(conf, uri, xface, factory);
+    this.alignmentContext = new ClientGSIContext();
+    factory.setAlignmentContext(alignmentContext);
+    this.innerProxy = failoverProxy;
+
+    String proxyInfoString = "RouterObserverReadProxyProvider for " + innerProxy.getProxy();
+
+    T wrappedProxy = (T) Proxy.newProxyInstance(
+        RouterObserverReadInvocationHandler.class.getClassLoader(),
+        new Class<?>[]{xface}, new RouterObserverReadInvocationHandler());
+    this.wrapperProxy = new ProxyInfo<>(wrappedProxy, proxyInfoString);
+
+    autoMsyncPeriodMs = conf.getTimeDuration(
+        // The host of the URI is the name service ID
+        AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
+        AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+
+    if (wrappedProxy instanceof ClientProtocol) {
+      this.observerReadEnabled = true;
+    } else {
+      LOG.info("Disabling observer reads for {} because the requested proxy "
+          + "class does not implement {}", uri, ClientProtocol.class.getName());
+      this.observerReadEnabled = false;
+    }
+  }
+
+
+  public AlignmentContext getAlignmentContext() {
+    return alignmentContext;
+  }
+
+  @Override
+  public ProxyInfo<T> getProxy() {
+    return wrapperProxy;
+  }
+
+  @Override
+  public void performFailover(T currentProxy) {
+    innerProxy.performFailover(currentProxy);
+  }
+
+  @Override
+  public boolean useLogicalURI() {
+    return innerProxy.useLogicalURI();
+  }
+
+  @Override
+  public void close() throws IOException {
+    innerProxy.close();
+  }
+
+  /**
+   * Return the input proxy, cast as a {@link ClientProtocol}. This catches any
+   * {@link ClassCastException} and wraps it in a more helpful message. This
+   * should ONLY be called if the caller is certain that the proxy is, in fact,
+   * a {@link ClientProtocol}.
+   */
+  private ClientProtocol getProxyAsClientProtocol(T proxy) {
+    assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy of class "
+        + proxy.getClass()
+        + " as if it was a ClientProtocol.";
+    return (ClientProtocol) proxy;
+  }
+
+  /**
+   * This will call {@link ClientProtocol#msync()} on the active NameNode
+   * (via the {@link #innerProxy}) to update the state of this client, only
+   * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time
+   * an msync was performed.
+   *
+   * @see #autoMsyncPeriodMs
+   */
+  private void autoMsyncIfNecessary() throws IOException {
+    if (autoMsyncPeriodMs == 0) {
+      // Always msync
+      getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync();
+    } else if (autoMsyncPeriodMs > 0) {
+      if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+        synchronized (this) {
+          // Use a synchronized block so that only one thread will msync
+          // if many operations are submitted around the same time.
+          // Re-check the entry criterion since the status may have changed
+          // while waiting for the lock.
+          if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+            getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync();
+            lastMsyncTimeMs = Time.monotonicNow();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Check if a method is read-only.
+   *
+   * @return whether the 'method' is a read-only operation.
+   */
+  private static boolean isRead(Method method) {
+    if (!method.isAnnotationPresent(ReadOnly.class)) {
+      return false;
+    }
+    return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
+  }
+
+  private class RouterObserverReadInvocationHandler implements RpcInvocationHandler {
+
+    @Override
+    public Client.ConnectionId getConnectionId() {
+      return RPC.getConnectionIdForProxy(innerProxy.getProxy().proxy);
+    }
+
+    @Override
+    public void close() throws IOException {
+      innerProxy.close();
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      if (observerReadEnabled && isRead(method)) {
+        autoMsyncIfNecessary();
+      }
+
+      try {
+        return method.invoke(innerProxy.getProxy().proxy, args);
+      } catch (InvocationTargetException e) {
+        // This exception will be handled by higher layers
+        throw e.getCause();
+      }
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
index c9e6945..a5bf5c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
@@ -177,6 +177,11 @@
       <artifactId>assertj-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
index 72e8f8f..437b330 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
@@ -51,6 +51,7 @@
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadProxyProvider;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.jupiter.api.Assertions;
@@ -58,10 +59,13 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.api.TestInfo;
 
 
 public class TestObserverWithRouter {
+  private static final int NUM_NAMESERVICES = 2;
   private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
   private MiniRouterDFSCluster cluster;
   private RouterContext routerContext;
@@ -102,7 +106,7 @@
           .iterator()
           .forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue()));
     }
-    cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode);
+    cluster = new MiniRouterDFSCluster(true, NUM_NAMESERVICES, numberOfNamenode);
     cluster.addNamenodeOverrides(conf);
     // Start NNs and DNs and wait until ready
     cluster.startCluster();
@@ -139,15 +143,34 @@
     routerContext  = cluster.getRandomRouter();
   }
 
-  private static Configuration getConfToEnableObserverReads() {
+  public enum ConfigSetting {
+    USE_NAMENODE_PROXY_FLAG,
+    USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER
+  }
+
+  private Configuration getConfToEnableObserverReads(ConfigSetting configSetting) {
     Configuration conf = new Configuration();
-    conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true);
+    switch (configSetting) {
+    case USE_NAMENODE_PROXY_FLAG:
+      conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true);
+      break;
+    case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
+      conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
+          "." +
+          routerContext.getRouter()
+              .getRpcServerAddress()
+              .getHostName(), RouterObserverReadProxyProvider.class.getName());
+      break;
+    default:
+      Assertions.fail("Unknown config setting: " + configSetting);
+    }
     return conf;
   }
 
-  @Test
-  public void testObserverRead() throws Exception {
-    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testObserverRead(ConfigSetting configSetting) throws Exception {
+    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     internalTestObserverRead();
   }
 
@@ -187,13 +210,15 @@
     assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
   }
 
-  @Test
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
   @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
-  public void testObserverReadWithoutFederatedStatePropagation() throws Exception {
+  public void testObserverReadWithoutFederatedStatePropagation(ConfigSetting configSetting)
+      throws Exception {
     Configuration confOverrides = new Configuration(false);
     confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
     startUpCluster(2, confOverrides);
-    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
+    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     List<? extends FederationNamenodeContext> namenodes = routerContext
         .getRouter().getNamenodeResolver()
         .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
@@ -216,14 +241,16 @@
     assertEquals("No call should be sent to observer", 0, rpcCountForObserver);
   }
 
-  @Test
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
   @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
-  public void testDisablingObserverReadUsingNameserviceOverride() throws Exception {
+  public void testDisablingObserverReadUsingNameserviceOverride(ConfigSetting configSetting)
+      throws Exception {
     // Disable observer reads using per-nameservice override
     Configuration confOverrides = new Configuration(false);
     confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
     startUpCluster(2, confOverrides);
-    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
+    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
 
     Path path = new Path("/testFile");
     fileSystem.create(path).close();
@@ -239,9 +266,10 @@
     assertEquals("Zero calls should be sent to observer", 0, rpcCountForObserver);
   }
 
-  @Test
-  public void testReadWhenObserverIsDown() throws Exception {
-    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testReadWhenObserverIsDown(ConfigSetting configSetting) throws Exception {
+    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     Path path = new Path("/testFile1");
     // Send Create call to active
     fileSystem.create(path).close();
@@ -267,9 +295,10 @@
         rpcCountForObserver);
   }
 
-  @Test
-  public void testMultipleObserver() throws Exception {
-    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testMultipleObserver(ConfigSetting configSetting) throws Exception {
+    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     Path path = new Path("/testFile1");
     // Send Create call to active
     fileSystem.create(path).close();
@@ -406,9 +435,10 @@
     innerCluster.shutdown();
   }
 
-  @Test
-  public void testUnavailableObserverNN() throws Exception {
-    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testUnavailableObserverNN(ConfigSetting configSetting) throws Exception {
+    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     stopObserver(2);
 
     Path path = new Path("/testFile");
@@ -442,9 +472,10 @@
     assertTrue("There must be unavailable namenodes", hasUnavailable);
   }
 
-  @Test
-  public void testRouterMsync() throws Exception {
-    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testRouterMsync(ConfigSetting configSetting) throws Exception {
+    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     Path path = new Path("/testFile");
 
     // Send Create call to active
@@ -464,9 +495,10 @@
         rpcCountForActive);
   }
 
-  @Test
-  public void testSingleRead() throws Exception {
-    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testSingleRead(ConfigSetting configSetting) throws Exception {
+    fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     List<? extends FederationNamenodeContext> namenodes = routerContext
         .getRouter().getNamenodeResolver()
         .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
@@ -554,10 +586,11 @@
     Assertions.assertEquals(10L, latestFederateState.get("ns0"));
   }
 
-  @Test
-  public void testStateIdProgressionInRouter() throws Exception {
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception {
     Path rootPath = new Path("/");
-    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads());
+    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     RouterStateIdContext routerStateIdContext = routerContext
         .getRouterRpcServer()
         .getRouterStateIdContext();
@@ -570,9 +603,10 @@
     assertEquals("Router's shared should have progressed.", 21, namespaceStateId.get());
   }
 
-  @Test
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
   @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
-  public void testSharedStateInRouterStateIdContext() throws Exception {
+  public void testSharedStateInRouterStateIdContext(ConfigSetting configSetting) throws Exception {
     Path rootPath = new Path("/");
     long cleanupPeriodMs = 1000;
 
@@ -580,7 +614,7 @@
     conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, cleanupPeriodMs);
     conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, cleanupPeriodMs / 10);
     startUpCluster(1, conf);
-    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads());
+    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
         .getRouterStateIdContext();
 
@@ -616,9 +650,10 @@
   }
 
 
-  @Test
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
   @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
-  public void testRouterStateIdContextCleanup() throws Exception {
+  public void testRouterStateIdContextCleanup(ConfigSetting configSetting) throws Exception {
     Path rootPath = new Path("/");
     long recordExpiry = TimeUnit.SECONDS.toMillis(1);
 
@@ -626,7 +661,7 @@
     confOverride.setLong(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, recordExpiry);
 
     startUpCluster(1, confOverride);
-    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads());
+    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
         .getRouterStateIdContext();
 
@@ -645,9 +680,11 @@
     assertTrue(namespace2.isEmpty());
   }
 
-  @Test
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
   @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
-  public void testPeriodicStateRefreshUsingActiveNamenode() throws Exception {
+  public void testPeriodicStateRefreshUsingActiveNamenode(ConfigSetting configSetting)
+      throws Exception {
     Path rootPath = new Path("/");
 
     Configuration confOverride = new Configuration(false);
@@ -655,7 +692,7 @@
     confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s");
     startUpCluster(1, confOverride);
 
-    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads());
+    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
     fileSystem.listStatus(rootPath);
     int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length;
 
@@ -682,4 +719,156 @@
     assertEquals("List-status should show newly created directories.",
         initialLengthOfRootListing + 10, rootFolderAfterMkdir.length);
   }
+
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception {
+    Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
+    clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." +
+        routerContext.getRouter().getRpcServerAddress().getHostName(), 0);
+    fileSystem = routerContext.getFileSystem(clientConfiguration);
+
+    List<? extends FederationNamenodeContext> namenodes = routerContext
+        .getRouter().getNamenodeResolver()
+        .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
+    assertEquals("First namenode should be observer", namenodes.get(0).getState(),
+        FederationNamenodeServiceState.OBSERVER);
+    Path path = new Path("/");
+
+    long rpcCountForActive;
+    long rpcCountForObserver;
+
+    // Send read requests
+    int numListings = 15;
+    for (int i = 0; i < numListings; i++) {
+      fileSystem.listFiles(path, false);
+    }
+    fileSystem.close();
+
+    rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+
+    rpcCountForObserver = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+
+    switch (configSetting) {
+    case USE_NAMENODE_PROXY_FLAG:
+      // First read goes to active.
+      assertEquals("Calls sent to the active", 1, rpcCountForActive);
+      // The rest of the reads are sent to the observer.
+      assertEquals("Reads sent to observer", numListings - 1, rpcCountForObserver);
+      break;
+    case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
+      // An msync is sent to each active namenode for each read.
+      // Total msyncs will be (numListings * num_of_nameservices).
+      assertEquals("Msyncs sent to the active namenodes",
+          NUM_NAMESERVICES * numListings, rpcCountForActive);
+      // All reads should be sent of the observer.
+      assertEquals("Reads sent to observer", numListings, rpcCountForObserver);
+      break;
+    default:
+      Assertions.fail("Unknown config setting: " + configSetting);
+    }
+  }
+
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception {
+    Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
+    clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." +
+        routerContext.getRouter().getRpcServerAddress().getHostName(), 3000);
+    fileSystem = routerContext.getFileSystem(clientConfiguration);
+
+    List<? extends FederationNamenodeContext> namenodes = routerContext
+        .getRouter().getNamenodeResolver()
+        .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
+    assertEquals("First namenode should be observer", namenodes.get(0).getState(),
+        FederationNamenodeServiceState.OBSERVER);
+    Path path = new Path("/");
+
+    long rpcCountForActive;
+    long rpcCountForObserver;
+
+    fileSystem.listFiles(path, false);
+    fileSystem.listFiles(path, false);
+    Thread.sleep(5000);
+    fileSystem.listFiles(path, false);
+    fileSystem.close();
+
+    rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+
+    rpcCountForObserver = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+
+    switch (configSetting) {
+    case USE_NAMENODE_PROXY_FLAG:
+      // First read goes to active.
+      assertEquals("Calls sent to the active", 1, rpcCountForActive);
+      // The rest of the reads are sent to the observer.
+      assertEquals("Reads sent to observer", 2, rpcCountForObserver);
+      break;
+    case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
+      // 4 msyncs expected. 2 for the first read, and 2 for the third read
+      // after the auto-msync period has elapsed during the sleep.
+      assertEquals("Msyncs sent to the active namenodes",
+          4, rpcCountForActive);
+      // All three reads should be sent of the observer.
+      assertEquals("Reads sent to observer", 3, rpcCountForObserver);
+      break;
+    default:
+      Assertions.fail("Unknown config setting: " + configSetting);
+    }
+  }
+
+  @EnumSource(ConfigSetting.class)
+  @ParameterizedTest
+  public void testThatWriteDoesntBypassNeedForMsync(ConfigSetting configSetting) throws Exception {
+    Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
+    clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." +
+        routerContext.getRouter().getRpcServerAddress().getHostName(), 3000);
+    fileSystem = routerContext.getFileSystem(clientConfiguration);
+
+    List<? extends FederationNamenodeContext> namenodes = routerContext
+        .getRouter().getNamenodeResolver()
+        .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
+    assertEquals("First namenode should be observer", namenodes.get(0).getState(),
+        FederationNamenodeServiceState.OBSERVER);
+    Path path = new Path("/");
+
+    long rpcCountForActive;
+    long rpcCountForObserver;
+
+    fileSystem.listFiles(path, false);
+    Thread.sleep(5000);
+    fileSystem.mkdirs(new Path(path, "mkdirLocation"));
+    fileSystem.listFiles(path, false);
+    fileSystem.close();
+
+    rpcCountForActive = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getActiveProxyOps();
+
+    rpcCountForObserver = routerContext.getRouter().getRpcServer()
+        .getRPCMetrics().getObserverProxyOps();
+
+    switch (configSetting) {
+    case USE_NAMENODE_PROXY_FLAG:
+      // First listing and mkdir go to the active.
+      assertEquals("Calls sent to the active namenodes", 2, rpcCountForActive);
+      // Second listing goes to the observer.
+      assertEquals("Read sent to observer", 1, rpcCountForObserver);
+      break;
+    case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
+      // 5 calls to the active namenodes expected. 4 msync and a mkdir.
+      // Each of the 2 reads results in an msync to 2 nameservices.
+      // The mkdir also goes to the active.
+      assertEquals("Calls sent to the active namenodes",
+          5, rpcCountForActive);
+      // Both reads should be sent of the observer.
+      assertEquals("Reads sent to observer", 2, rpcCountForObserver);
+      break;
+    default:
+      Assertions.fail("Unknown config setting: " + configSetting);
+    }
+  }
 }