diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index ad06dec..ad05865 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -86,16 +86,15 @@
    */
   private static final int TIMEOUT_BUFFER_FOR_CONNECTION_CLEANUP_MS = 5000;
 
-  public static final String ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME =
-      "geode.allow-internal-messages-without-credentials";
+  public static final String DISALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME =
+      "geode.disallow-internal-messages-without-credentials";
 
   /**
-   * This property allows folks to perform a rolling upgrade from pre-1.2.1 to a post-1.2.1 cluster.
-   * Normally internal messages that can affect server state require credentials but pre-1.2.1 this
-   * wasn't the case. See GEODE-3249
+   * When true requires some formerly credential-less messages to carry credentials. See GEODE-3249
+   * and ServerConnection.isInternalMessage()
    */
-  private static final boolean ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS =
-      Boolean.getBoolean(ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME);
+  public static boolean allowInternalMessagesWithoutCredentials =
+      !(Boolean.getBoolean(DISALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME));
 
   private Map commands;
 
@@ -780,7 +779,7 @@
         // if a subject exists for this uniqueId, binds the subject to this thread so that we can do
         // authorization later
         if (AcceptorImpl.isIntegratedSecurity()
-            && !isInternalMessage(this.requestMsg, ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS)
+            && !isInternalMessage(this.requestMsg, allowInternalMessagesWithoutCredentials)
             && this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY) {
           long uniqueId = getUniqueId();
           Subject subject = this.clientUserAuths.getSubject(uniqueId);
@@ -1092,7 +1091,7 @@
         && this.handshake.getVersion().compareTo(Version.GFE_65) >= 0
         && (this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY)
         && (!this.requestMsg.getAndResetIsMetaRegion())
-        && (!isInternalMessage(this.requestMsg, ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS))) {
+        && !isInternalMessage(this.requestMsg, allowInternalMessagesWithoutCredentials)) {
       setSecurityPart();
       return this.securePart;
     } else {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
index 2cb36cd..3c2026a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
@@ -47,6 +47,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
 
     EnumInfo enumInfo = (EnumInfo) clientMessage.getPart(0).getObject();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
index 3feba0d..6f6ce98 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
@@ -47,6 +47,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
 
     PdxType type = (PdxType) clientMessage.getPart(0).getObject();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetFunctionAttribute.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetFunctionAttribute.java
index 2c1f26c..9813ad0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetFunctionAttribute.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetFunctionAttribute.java
@@ -37,6 +37,11 @@
   public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
       throws IOException {
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     String functionId = clientMessage.getPart(0).getString();
     if (functionId == null) {
       String message =
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
index 15215de..77566c8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
@@ -44,6 +44,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int enumId = clientMessage.getPart(0).getInt();
 
     EnumInfo result;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
index 40c62a7..ca2e73a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
@@ -45,6 +45,10 @@
           serverConnection.getSocketString());
     }
 
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     EnumInfo enumInfo = (EnumInfo) clientMessage.getPart(0).getObject();
 
     int enumId;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
index c1de0fc..057c567 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
@@ -44,6 +44,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
 
     PdxType type = (PdxType) clientMessage.getPart(0).getObject();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
index 16e9dd0..e6d7c91 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
@@ -44,6 +44,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int pdxId = clientMessage.getPart(0).getInt();
 
     PdxType type;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
index 7753584..bc7b4a6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
@@ -44,6 +44,10 @@
           serverConnection.getSocketString());
     }
 
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     Map<Integer, EnumInfo> enums;
     try {
       InternalCache cache = serverConnection.getCache();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
index c31a375..5dd51e1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
@@ -44,6 +44,10 @@
           serverConnection.getSocketString());
     }
 
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     Map<Integer, PdxType> types;
     try {
       InternalCache cache = serverConnection.getCache();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
index 053b2a8..8d9ada8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
@@ -39,11 +39,17 @@
 
   public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
       throws IOException, ClassNotFoundException {
+    serverConnection.setAsTrue(REQUIRES_RESPONSE);
     if (logger.isDebugEnabled()) {
       logger.debug("{}: Received register dataserializer request ({} parts) from {}",
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
 
     // 2 parts per instantiator and one eventId part
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
index df5a46c..824e6b1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
@@ -51,11 +51,17 @@
   @Override
   public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
       throws IOException, ClassNotFoundException {
+    serverConnection.setAsTrue(REQUIRES_RESPONSE);
     if (logger.isDebugEnabled()) {
       logger.debug("{}: Received register instantiator request ({} parts) from {}",
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
     // Assert parts
     Assert.assertTrue((noOfParts - 1) % 3 == 0);
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationDUnitTest.java
index ca7b2b6..7a8e322 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationDUnitTest.java
@@ -14,13 +14,20 @@
  */
 package org.apache.geode.security;
 
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Test for authentication from client to server. This tests for both valid and invalid
@@ -30,7 +37,24 @@
  * @since GemFire 5.5
  */
 @Category({DistributedTest.class, SecurityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
 public class ClientAuthenticationDUnitTest extends ClientAuthenticationTestCase {
+  @Parameterized.Parameters
+  public static Collection<String> data() {
+    List<String> result = VersionManager.getInstance().getVersions();
+    if (result.size() < 1) {
+      throw new RuntimeException("No older versions of Geode were found to test against");
+    } else {
+      System.out.println("running against these versions: " + result);
+    }
+    return result;
+  }
+
+  public ClientAuthenticationDUnitTest(String version) {
+    super();
+    clientVersion = version;
+  }
 
   @Test
   public void testValidCredentials() throws Exception {
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationPart2DUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationPart2DUnitTest.java
index f8ebe05..db2ba95 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationPart2DUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationPart2DUnitTest.java
@@ -16,23 +16,47 @@
 
 import static org.mockito.Mockito.*;
 
+import java.util.Collection;
+import java.util.List;
+
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 /**
  * this class contains test methods that used to be in its superclass but that test started taking
  * too long and caused dunit runs to hang
  */
 @Category({DistributedTest.class, SecurityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
 public class ClientAuthenticationPart2DUnitTest extends ClientAuthenticationTestCase {
+  @Parameterized.Parameters
+  public static Collection<String> data() {
+    List<String> result = VersionManager.getInstance().getVersions();
+    if (result.size() < 1) {
+      throw new RuntimeException("No older versions of Geode were found to test against");
+    } else {
+      System.out.println("running against these versions: " + result);
+    }
+    return result;
+  }
+
+  public ClientAuthenticationPart2DUnitTest(String version) {
+    super();
+    clientVersion = version;
+  }
 
   @Test
   public void testNoCredentialsForMultipleUsers() throws Exception {
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationTestCase.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationTestCase.java
index 0ecd72f..417bb50 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationTestCase.java
@@ -31,20 +31,27 @@
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLHandshakeException;
 
+import org.junit.Assert;
+import org.junit.Rule;
+
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.client.Pool;
 import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.ExecutablePool;
 import org.apache.geode.cache.client.internal.RegisterDataSerializersOp;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.security.generator.CredentialGenerator;
 import org.apache.geode.security.generator.DummyCredentialGenerator;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 
 public abstract class ClientAuthenticationTestCase extends JUnit4DistributedTestCase {
 
@@ -92,15 +99,22 @@
     }
   }
 
-
+  public String clientVersion = VersionManager.CURRENT_VERSION;
 
   @Override
   public final void postSetUp() throws Exception {
     final Host host = Host.getHost(0);
     server1 = host.getVM(0);
     server2 = host.getVM(1);
-    client1 = host.getVM(2);
-    client2 = host.getVM(3);
+    server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = false);
+    server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = false);
+    if (VersionManager.isCurrentVersion(clientVersion)) {
+      client1 = host.getVM(2);
+      client2 = host.getVM(3);
+    } else {
+      client1 = host.getVM(clientVersion, 2);
+      client2 = host.getVM(clientVersion, 3);
+    }
 
     addIgnoredException("Connection refused: connect");
 
@@ -110,6 +124,14 @@
     client2.invoke(() -> registerExpectedExceptions(clientIgnoredExceptions));
   }
 
+  @Override
+  public void postTearDown() throws Exception {
+    super.postTearDown();
+    server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+    server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+  }
+
+
   protected void doTestValidCredentials(final boolean multiUser) throws Exception {
     CredentialGenerator gen = new DummyCredentialGenerator();
     Properties extraProps = gen.getSystemProperties();
@@ -246,21 +268,25 @@
       client2.invoke(() -> doPuts(2, OTHER_EXCEPTION));
 
     } else {
-      client2.invoke(
+      client1.invoke(
           () -> createCacheClient(null, null, null, port1, port2, 0, multiUser, AUTHREQ_EXCEPTION));
 
       // Try to register a PDX type with the server
-      client2.invoke("register a PDX type", () -> {
+      client1.invoke("register a PDX type", () -> {
         HeapDataOutputStream outputStream = new HeapDataOutputStream(100, Version.CURRENT);
         try {
           DataSerializer.writeObject(new Employee(106l, "David", "Copperfield"), outputStream);
           throw new Error("operation should have been rejected");
+        } catch (ServerOperationException e) {
+          Assert.assertEquals(AuthenticationRequiredException.class, e.getCause().getClass());
         } catch (UnsupportedOperationException e) {
-          // "UnsupportedOperationException: Use Pool APIs for doing operations when
-          // multiuser-secure-mode-enabled is set to true."
+          // expected
         }
       });
 
+      client2.invoke(
+          () -> createCacheClient(null, null, null, port1, port2, 0, multiUser, AUTHREQ_EXCEPTION));
+
       // Try to register a DataSerializer with the server
       client2.invoke("register a data serializer", () -> {
         EventID eventId = InternalDataSerializer.generateEventId();
@@ -269,9 +295,10 @@
           RegisterDataSerializersOp.execute((ExecutablePool) pool,
               new DataSerializer[] {new MyDataSerializer()}, eventId);
           throw new Error("operation should have been rejected");
+        } catch (ServerOperationException e) {
+          Assert.assertEquals(AuthenticationRequiredException.class, e.getCause().getClass());
         } catch (UnsupportedOperationException e) {
-          // "UnsupportedOperationException: Use Pool APIs for doing operations when
-          // multiuser-secure-mode-enabled is set to true."
+          // expected
         }
       });
     }
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java
index 09aedbe..e42f889 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java
@@ -21,6 +21,7 @@
 import static org.apache.geode.test.dunit.LogWriterUtils.*;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
@@ -28,16 +29,22 @@
 import org.apache.geode.internal.AvailablePortHelper;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.geode.cache.operations.OperationContext.OperationCode;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.security.generator.AuthzCredentialGenerator;
 import org.apache.geode.security.generator.CredentialGenerator;
 import org.apache.geode.security.generator.DummyCredentialGenerator;
 import org.apache.geode.security.generator.XmlAuthzCredentialGenerator;
 import org.apache.geode.security.templates.UserPasswordAuthInit;
+import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 /**
  * Tests for authorization from client to server. This tests for authorization of all operations
@@ -47,7 +54,25 @@
  * @since GemFire 5.5
  */
 @Category({DistributedTest.class, SecurityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
 public class ClientAuthorizationDUnitTest extends ClientAuthorizationTestCase {
+  @Parameterized.Parameters
+  public static Collection<String> data() {
+    List<String> result = VersionManager.getInstance().getVersions();
+    if (result.size() < 1) {
+      throw new RuntimeException("No older versions of Geode were found to test against");
+    } else {
+      System.out.println("running against these versions: " + result);
+    }
+    return result;
+  }
+
+  public ClientAuthorizationDUnitTest(String version) {
+    super();
+    clientVersion = version;
+  }
+
 
   @Override
   public final void preTearDownClientAuthorizationTestBase() throws Exception {
@@ -112,6 +137,13 @@
     String authInit = cGen.getAuthInit();
     String accessor = gen.getAuthorizationCallback();
 
+    // this test registers a PDX type and needs the servers to accept it without
+    // credentials in old clients
+    if (clientVersion.compareTo("130") < 0 && !VersionManager.isCurrentVersion(clientVersion)) {
+      server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+      server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+    }
+
     getLogWriter().info("testPutAllWithSecurity: Using authinit: " + authInit);
     getLogWriter().info("testPutAllWithSecurity: Using authenticator: " + authenticator);
     getLogWriter().info("testPutAllWithSecurity: Using accessor: " + accessor);
@@ -518,19 +550,7 @@
         new OperationWithAction(OperationCode.DESTROY, 3,
             OpFlags.USE_NEWVAL | OpFlags.CHECK_NOTAUTHZ, 4),
         new OperationWithAction(OperationCode.DESTROY),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.CHECK_FAIL, 4), // bruce:
-                                                                                                    // added
-                                                                                                    // check_nokey
-                                                                                                    // because
-                                                                                                    // we
-                                                                                                    // now
-                                                                                                    // bring
-                                                                                                    // tombstones
-                                                                                                    // to
-                                                                                                    // the
-                                                                                                    // client
-                                                                                                    // in
-                                                                                                    // 8.0
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.CHECK_FAIL, 4),
         // Repopulate the region
         new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_NEWVAL, 4),
 
@@ -582,6 +602,7 @@
             OpFlags.USE_GET_ENTRY_IN_TX | OpFlags.CHECK_FAIL, 4),
 
         OperationWithAction.OPBLOCK_END};
+
   }
 
   private Properties getUserPassword(final String userName) {
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationTestCase.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationTestCase.java
index a4fd365..a41a720 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationTestCase.java
@@ -55,14 +55,17 @@
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.AbstractRegionEntry;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.security.generator.AuthzCredentialGenerator;
 import org.apache.geode.security.generator.AuthzCredentialGenerator.ClassCode;
 import org.apache.geode.security.generator.CredentialGenerator;
 import org.apache.geode.security.generator.DummyCredentialGenerator;
 import org.apache.geode.security.generator.XmlAuthzCredentialGenerator;
+import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 
 /**
  * Base class for tests for authorization from client to server. It contains utility functions for
@@ -79,6 +82,8 @@
   protected static VM client1 = null;
   protected static VM client2 = null;
 
+  public String clientVersion = VersionManager.CURRENT_VERSION;
+
   protected static final String regionName = REGION_NAME; // TODO: remove
   protected static final String SUBREGION_NAME = "AuthSubregion";
 
@@ -103,10 +108,18 @@
   }
 
   private void setUpClientAuthorizationTestBase() throws Exception {
-    server1 = getHost(0).getVM(0);
-    server2 = getHost(0).getVM(1);
-    client1 = getHost(0).getVM(2);
-    client2 = getHost(0).getVM(3);
+    Host host = getHost(0);
+    server1 = host.getVM(0);
+    server2 = host.getVM(1);
+    server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = false);
+    server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = false);
+    if (VersionManager.isCurrentVersion(clientVersion)) {
+      client1 = host.getVM(2);
+      client2 = host.getVM(3);
+    } else {
+      client1 = host.getVM(clientVersion, 2);
+      client2 = host.getVM(clientVersion, 3);
+    }
     setUpIgnoredExceptions();
   }
 
@@ -155,10 +168,10 @@
   public final void postTearDown() throws Exception {}
 
   private final void tearDownClientAuthorizationTestBase() throws Exception {
-    // close the clients first
+    server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+    server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
     client1.invoke(() -> closeCache());
     client2.invoke(() -> closeCache());
-    // then close the servers
     server1.invoke(() -> closeCache());
     server2.invoke(() -> closeCache());
   }
@@ -293,10 +306,6 @@
     boolean exceptionOccurred = false;
     boolean breakLoop = false;
 
-    if (op.isGet() || op.isContainsKey() || op.isKeySet() || op.isQuery() || op.isExecuteCQ()) {
-      Thread.sleep(PAUSE);
-    }
-
     for (int indexIndex = 0; indexIndex < numOps; ++indexIndex) {
       if (breakLoop) {
         break;
diff --git a/geode-core/src/test/java/org/apache/geode/security/SecurityTestUtils.java b/geode-core/src/test/java/org/apache/geode/security/SecurityTestUtils.java
index e69f36d..828210c 100644
--- a/geode-core/src/test/java/org/apache/geode/security/SecurityTestUtils.java
+++ b/geode-core/src/test/java/org/apache/geode/security/SecurityTestUtils.java
@@ -991,6 +991,8 @@
         fail("Expected a NotAuthorizedException while executing sizeOnServer");
       }
       assertEquals(size, sizeOnServer);
+    } catch (NoSuchMethodError ex) {
+      // expected with backward-compatibility tests
     } catch (Exception ex) {
       fail("Got unexpected exception when executing sizeOnServer", ex);
     }
@@ -1004,6 +1006,8 @@
         fail("Expected a NotAuthorizedException while executing isEmptyOnServer");
       }
       assertEquals(isEmpty, isEmptyOnServer);
+    } catch (NoSuchMethodError ex) {
+      // expected with old version clients
     } catch (Exception ex) {
       fail("Got unexpected exception when executing isEmptyOnServer", ex);
     }
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
index 5f0118b..3e6ea3d 100644
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
@@ -42,6 +42,10 @@
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
     serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
 
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int op = clientMessage.getPart(0).getInt();
 
     if (op < 1) {
diff --git a/geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationTwoDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationCQDUnitTest.java
similarity index 90%
rename from geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationTwoDUnitTest.java
rename to geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationCQDUnitTest.java
index f5f686c..83358ea 100644
--- a/geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationTwoDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationCQDUnitTest.java
@@ -17,12 +17,19 @@
 import static org.apache.geode.security.SecurityTestUtils.*;
 import static org.apache.geode.test.dunit.IgnoredException.*;
 
+import java.util.Collection;
+import java.util.List;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.geode.cache.operations.OperationContext.OperationCode;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 /**
  * Tests for authorization from client to server. This tests for authorization of all operations
@@ -35,7 +42,24 @@
  * @since GemFire 5.5
  */
 @Category({DistributedTest.class, SecurityTest.class})
-public class ClientAuthorizationTwoDUnitTest extends ClientAuthorizationTestCase {
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class ClientAuthorizationCQDUnitTest extends ClientAuthorizationTestCase {
+  @Parameterized.Parameters
+  public static Collection<String> data() {
+    List<String> result = VersionManager.getInstance().getVersions();
+    if (result.size() < 1) {
+      throw new RuntimeException("No older versions of Geode were found to test against");
+    } else {
+      System.out.println("running against these versions: " + result);
+    }
+    return result;
+  }
+
+  public ClientAuthorizationCQDUnitTest(String version) {
+    super();
+    clientVersion = version;
+  }
 
   @Override
   public final void postSetUpClientAuthorizationTestBase() throws Exception {
