GEODE-3249 Validate internal client/server messages

This change leaves the security hole in place but allows you to plug
it by setting the system property

geode.disallow-internal-messages-without-credentials=true

Clients must be upgraded to the release containing this change if you
set this system property to true and client/server authentication is
enabled.  Otherwise client messages to register PDX types or
Instantiators will be rejected by the servers.

New tests have been added to perform backward-compatibility testing
with the old security implementation and the internal message command
classes have been modified to perform validation of credentials if
the system property is set to true.

(cherry picked from commit abbb359fe59ea3e74462fe48890918108a0edda3)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index ad06dec..ad05865 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -86,16 +86,15 @@
    */
   private static final int TIMEOUT_BUFFER_FOR_CONNECTION_CLEANUP_MS = 5000;
 
-  public static final String ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME =
-      "geode.allow-internal-messages-without-credentials";
+  public static final String DISALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME =
+      "geode.disallow-internal-messages-without-credentials";
 
   /**
-   * This property allows folks to perform a rolling upgrade from pre-1.2.1 to a post-1.2.1 cluster.
-   * Normally internal messages that can affect server state require credentials but pre-1.2.1 this
-   * wasn't the case. See GEODE-3249
+   * When true requires some formerly credential-less messages to carry credentials. See GEODE-3249
+   * and ServerConnection.isInternalMessage()
    */
-  private static final boolean ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS =
-      Boolean.getBoolean(ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME);
+  public static boolean allowInternalMessagesWithoutCredentials =
+      !(Boolean.getBoolean(DISALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME));
 
   private Map commands;
 
@@ -780,7 +779,7 @@
         // if a subject exists for this uniqueId, binds the subject to this thread so that we can do
         // authorization later
         if (AcceptorImpl.isIntegratedSecurity()
-            && !isInternalMessage(this.requestMsg, ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS)
+            && !isInternalMessage(this.requestMsg, allowInternalMessagesWithoutCredentials)
             && this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY) {
           long uniqueId = getUniqueId();
           Subject subject = this.clientUserAuths.getSubject(uniqueId);
@@ -1092,7 +1091,7 @@
         && this.handshake.getVersion().compareTo(Version.GFE_65) >= 0
         && (this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY)
         && (!this.requestMsg.getAndResetIsMetaRegion())
-        && (!isInternalMessage(this.requestMsg, ALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS))) {
+        && !isInternalMessage(this.requestMsg, allowInternalMessagesWithoutCredentials)) {
       setSecurityPart();
       return this.securePart;
     } else {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
index 2cb36cd..3c2026a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
@@ -47,6 +47,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
 
     EnumInfo enumInfo = (EnumInfo) clientMessage.getPart(0).getObject();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
index 3feba0d..6f6ce98 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
@@ -47,6 +47,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
 
     PdxType type = (PdxType) clientMessage.getPart(0).getObject();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetFunctionAttribute.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetFunctionAttribute.java
index 2c1f26c..9813ad0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetFunctionAttribute.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetFunctionAttribute.java
@@ -37,6 +37,11 @@
   public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
       throws IOException {
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     String functionId = clientMessage.getPart(0).getString();
     if (functionId == null) {
       String message =
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
index 15215de..77566c8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
@@ -44,6 +44,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int enumId = clientMessage.getPart(0).getInt();
 
     EnumInfo result;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
index 40c62a7..ca2e73a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
@@ -45,6 +45,10 @@
           serverConnection.getSocketString());
     }
 
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     EnumInfo enumInfo = (EnumInfo) clientMessage.getPart(0).getObject();
 
     int enumId;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
index c1de0fc..057c567 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
@@ -44,6 +44,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
 
     PdxType type = (PdxType) clientMessage.getPart(0).getObject();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
index 16e9dd0..e6d7c91 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
@@ -44,6 +44,11 @@
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int pdxId = clientMessage.getPart(0).getInt();
 
     PdxType type;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
index 7753584..bc7b4a6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
@@ -44,6 +44,10 @@
           serverConnection.getSocketString());
     }
 
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     Map<Integer, EnumInfo> enums;
     try {
       InternalCache cache = serverConnection.getCache();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
index c31a375..5dd51e1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
@@ -44,6 +44,10 @@
           serverConnection.getSocketString());
     }
 
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     Map<Integer, PdxType> types;
     try {
       InternalCache cache = serverConnection.getCache();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
index 053b2a8..8d9ada8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java
@@ -39,11 +39,17 @@
 
   public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
       throws IOException, ClassNotFoundException {
+    serverConnection.setAsTrue(REQUIRES_RESPONSE);
     if (logger.isDebugEnabled()) {
       logger.debug("{}: Received register dataserializer request ({} parts) from {}",
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
 
     // 2 parts per instantiator and one eventId part
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
index df5a46c..824e6b1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java
@@ -51,11 +51,17 @@
   @Override
   public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
       throws IOException, ClassNotFoundException {
+    serverConnection.setAsTrue(REQUIRES_RESPONSE);
     if (logger.isDebugEnabled()) {
       logger.debug("{}: Received register instantiator request ({} parts) from {}",
           serverConnection.getName(), clientMessage.getNumberOfParts(),
           serverConnection.getSocketString());
     }
+
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int noOfParts = clientMessage.getNumberOfParts();
     // Assert parts
     Assert.assertTrue((noOfParts - 1) % 3 == 0);
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationDUnitTest.java
index ca7b2b6..7a8e322 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationDUnitTest.java
@@ -14,13 +14,20 @@
  */
 package org.apache.geode.security;
 
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Test for authentication from client to server. This tests for both valid and invalid
@@ -30,7 +37,24 @@
  * @since GemFire 5.5
  */
 @Category({DistributedTest.class, SecurityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
 public class ClientAuthenticationDUnitTest extends ClientAuthenticationTestCase {
+  @Parameterized.Parameters
+  public static Collection<String> data() {
+    List<String> result = VersionManager.getInstance().getVersions();
+    if (result.size() < 1) {
+      throw new RuntimeException("No older versions of Geode were found to test against");
+    } else {
+      System.out.println("running against these versions: " + result);
+    }
+    return result;
+  }
+
+  public ClientAuthenticationDUnitTest(String version) {
+    super();
+    clientVersion = version;
+  }
 
   @Test
   public void testValidCredentials() throws Exception {
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationPart2DUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationPart2DUnitTest.java
index f8ebe05..db2ba95 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationPart2DUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationPart2DUnitTest.java
@@ -16,23 +16,47 @@
 
 import static org.mockito.Mockito.*;
 
+import java.util.Collection;
+import java.util.List;
+
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 /**
  * this class contains test methods that used to be in its superclass but that test started taking
  * too long and caused dunit runs to hang
  */
 @Category({DistributedTest.class, SecurityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
 public class ClientAuthenticationPart2DUnitTest extends ClientAuthenticationTestCase {
+  @Parameterized.Parameters
+  public static Collection<String> data() {
+    List<String> result = VersionManager.getInstance().getVersions();
+    if (result.size() < 1) {
+      throw new RuntimeException("No older versions of Geode were found to test against");
+    } else {
+      System.out.println("running against these versions: " + result);
+    }
+    return result;
+  }
+
+  public ClientAuthenticationPart2DUnitTest(String version) {
+    super();
+    clientVersion = version;
+  }
 
   @Test
   public void testNoCredentialsForMultipleUsers() throws Exception {
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationTestCase.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationTestCase.java
index 0ecd72f..417bb50 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthenticationTestCase.java
@@ -31,20 +31,27 @@
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLHandshakeException;
 
+import org.junit.Assert;
+import org.junit.Rule;
+
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.client.Pool;
 import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.ExecutablePool;
 import org.apache.geode.cache.client.internal.RegisterDataSerializersOp;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.security.generator.CredentialGenerator;
 import org.apache.geode.security.generator.DummyCredentialGenerator;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 
 public abstract class ClientAuthenticationTestCase extends JUnit4DistributedTestCase {
 
@@ -92,15 +99,22 @@
     }
   }
 
-
+  public String clientVersion = VersionManager.CURRENT_VERSION;
 
   @Override
   public final void postSetUp() throws Exception {
     final Host host = Host.getHost(0);
     server1 = host.getVM(0);
     server2 = host.getVM(1);
-    client1 = host.getVM(2);
-    client2 = host.getVM(3);
+    server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = false);
+    server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = false);
+    if (VersionManager.isCurrentVersion(clientVersion)) {
+      client1 = host.getVM(2);
+      client2 = host.getVM(3);
+    } else {
+      client1 = host.getVM(clientVersion, 2);
+      client2 = host.getVM(clientVersion, 3);
+    }
 
     addIgnoredException("Connection refused: connect");
 
@@ -110,6 +124,14 @@
     client2.invoke(() -> registerExpectedExceptions(clientIgnoredExceptions));
   }
 
+  @Override
+  public void postTearDown() throws Exception {
+    super.postTearDown();
+    server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+    server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+  }
+
+
   protected void doTestValidCredentials(final boolean multiUser) throws Exception {
     CredentialGenerator gen = new DummyCredentialGenerator();
     Properties extraProps = gen.getSystemProperties();
@@ -246,21 +268,25 @@
       client2.invoke(() -> doPuts(2, OTHER_EXCEPTION));
 
     } else {
-      client2.invoke(
+      client1.invoke(
           () -> createCacheClient(null, null, null, port1, port2, 0, multiUser, AUTHREQ_EXCEPTION));
 
       // Try to register a PDX type with the server
-      client2.invoke("register a PDX type", () -> {
+      client1.invoke("register a PDX type", () -> {
         HeapDataOutputStream outputStream = new HeapDataOutputStream(100, Version.CURRENT);
         try {
           DataSerializer.writeObject(new Employee(106l, "David", "Copperfield"), outputStream);
           throw new Error("operation should have been rejected");
+        } catch (ServerOperationException e) {
+          Assert.assertEquals(AuthenticationRequiredException.class, e.getCause().getClass());
         } catch (UnsupportedOperationException e) {
-          // "UnsupportedOperationException: Use Pool APIs for doing operations when
-          // multiuser-secure-mode-enabled is set to true."
+          // expected
         }
       });
 
+      client2.invoke(
+          () -> createCacheClient(null, null, null, port1, port2, 0, multiUser, AUTHREQ_EXCEPTION));
+
       // Try to register a DataSerializer with the server
       client2.invoke("register a data serializer", () -> {
         EventID eventId = InternalDataSerializer.generateEventId();
@@ -269,9 +295,10 @@
           RegisterDataSerializersOp.execute((ExecutablePool) pool,
               new DataSerializer[] {new MyDataSerializer()}, eventId);
           throw new Error("operation should have been rejected");
+        } catch (ServerOperationException e) {
+          Assert.assertEquals(AuthenticationRequiredException.class, e.getCause().getClass());
         } catch (UnsupportedOperationException e) {
-          // "UnsupportedOperationException: Use Pool APIs for doing operations when
-          // multiuser-secure-mode-enabled is set to true."
+          // expected
         }
       });
     }
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java
index 09aedbe..e42f889 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java
@@ -21,6 +21,7 @@
 import static org.apache.geode.test.dunit.LogWriterUtils.*;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
@@ -28,16 +29,22 @@
 import org.apache.geode.internal.AvailablePortHelper;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.geode.cache.operations.OperationContext.OperationCode;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.security.generator.AuthzCredentialGenerator;
 import org.apache.geode.security.generator.CredentialGenerator;
 import org.apache.geode.security.generator.DummyCredentialGenerator;
 import org.apache.geode.security.generator.XmlAuthzCredentialGenerator;
 import org.apache.geode.security.templates.UserPasswordAuthInit;
+import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 /**
  * Tests for authorization from client to server. This tests for authorization of all operations
@@ -47,7 +54,25 @@
  * @since GemFire 5.5
  */
 @Category({DistributedTest.class, SecurityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
 public class ClientAuthorizationDUnitTest extends ClientAuthorizationTestCase {
+  @Parameterized.Parameters
+  public static Collection<String> data() {
+    List<String> result = VersionManager.getInstance().getVersions();
+    if (result.size() < 1) {
+      throw new RuntimeException("No older versions of Geode were found to test against");
+    } else {
+      System.out.println("running against these versions: " + result);
+    }
+    return result;
+  }
+
+  public ClientAuthorizationDUnitTest(String version) {
+    super();
+    clientVersion = version;
+  }
+
 
   @Override
   public final void preTearDownClientAuthorizationTestBase() throws Exception {
@@ -112,6 +137,13 @@
     String authInit = cGen.getAuthInit();
     String accessor = gen.getAuthorizationCallback();
 
+    // this test registers a PDX type and needs the servers to accept it without
+    // credentials in old clients
+    if (clientVersion.compareTo("130") < 0 && !VersionManager.isCurrentVersion(clientVersion)) {
+      server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+      server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+    }
+
     getLogWriter().info("testPutAllWithSecurity: Using authinit: " + authInit);
     getLogWriter().info("testPutAllWithSecurity: Using authenticator: " + authenticator);
     getLogWriter().info("testPutAllWithSecurity: Using accessor: " + accessor);
@@ -518,19 +550,7 @@
         new OperationWithAction(OperationCode.DESTROY, 3,
             OpFlags.USE_NEWVAL | OpFlags.CHECK_NOTAUTHZ, 4),
         new OperationWithAction(OperationCode.DESTROY),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.CHECK_FAIL, 4), // bruce:
-                                                                                                    // added
-                                                                                                    // check_nokey
-                                                                                                    // because
-                                                                                                    // we
-                                                                                                    // now
-                                                                                                    // bring
-                                                                                                    // tombstones
-                                                                                                    // to
-                                                                                                    // the
-                                                                                                    // client
-                                                                                                    // in
-                                                                                                    // 8.0
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.CHECK_FAIL, 4),
         // Repopulate the region
         new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_NEWVAL, 4),
 
@@ -582,6 +602,7 @@
             OpFlags.USE_GET_ENTRY_IN_TX | OpFlags.CHECK_FAIL, 4),
 
         OperationWithAction.OPBLOCK_END};
+
   }
 
   private Properties getUserPassword(final String userName) {
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationTestCase.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationTestCase.java
index a4fd365..a41a720 100644
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationTestCase.java
@@ -55,14 +55,17 @@
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.AbstractRegionEntry;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.security.generator.AuthzCredentialGenerator;
 import org.apache.geode.security.generator.AuthzCredentialGenerator.ClassCode;
 import org.apache.geode.security.generator.CredentialGenerator;
 import org.apache.geode.security.generator.DummyCredentialGenerator;
 import org.apache.geode.security.generator.XmlAuthzCredentialGenerator;
+import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 
 /**
  * Base class for tests for authorization from client to server. It contains utility functions for
@@ -79,6 +82,8 @@
   protected static VM client1 = null;
   protected static VM client2 = null;
 
+  public String clientVersion = VersionManager.CURRENT_VERSION;
+
   protected static final String regionName = REGION_NAME; // TODO: remove
   protected static final String SUBREGION_NAME = "AuthSubregion";
 
@@ -103,10 +108,18 @@
   }
 
   private void setUpClientAuthorizationTestBase() throws Exception {
-    server1 = getHost(0).getVM(0);
-    server2 = getHost(0).getVM(1);
-    client1 = getHost(0).getVM(2);
-    client2 = getHost(0).getVM(3);
+    Host host = getHost(0);
+    server1 = host.getVM(0);
+    server2 = host.getVM(1);
+    server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = false);
+    server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = false);
+    if (VersionManager.isCurrentVersion(clientVersion)) {
+      client1 = host.getVM(2);
+      client2 = host.getVM(3);
+    } else {
+      client1 = host.getVM(clientVersion, 2);
+      client2 = host.getVM(clientVersion, 3);
+    }
     setUpIgnoredExceptions();
   }
 
@@ -155,10 +168,10 @@
   public final void postTearDown() throws Exception {}
 
   private final void tearDownClientAuthorizationTestBase() throws Exception {
-    // close the clients first
+    server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
+    server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
     client1.invoke(() -> closeCache());
     client2.invoke(() -> closeCache());
-    // then close the servers
     server1.invoke(() -> closeCache());
     server2.invoke(() -> closeCache());
   }
@@ -293,10 +306,6 @@
     boolean exceptionOccurred = false;
     boolean breakLoop = false;
 
-    if (op.isGet() || op.isContainsKey() || op.isKeySet() || op.isQuery() || op.isExecuteCQ()) {
-      Thread.sleep(PAUSE);
-    }
-
     for (int indexIndex = 0; indexIndex < numOps; ++indexIndex) {
       if (breakLoop) {
         break;
diff --git a/geode-core/src/test/java/org/apache/geode/security/SecurityTestUtils.java b/geode-core/src/test/java/org/apache/geode/security/SecurityTestUtils.java
index e69f36d..828210c 100644
--- a/geode-core/src/test/java/org/apache/geode/security/SecurityTestUtils.java
+++ b/geode-core/src/test/java/org/apache/geode/security/SecurityTestUtils.java
@@ -991,6 +991,8 @@
         fail("Expected a NotAuthorizedException while executing sizeOnServer");
       }
       assertEquals(size, sizeOnServer);
+    } catch (NoSuchMethodError ex) {
+      // expected with backward-compatibility tests
     } catch (Exception ex) {
       fail("Got unexpected exception when executing sizeOnServer", ex);
     }
@@ -1004,6 +1006,8 @@
         fail("Expected a NotAuthorizedException while executing isEmptyOnServer");
       }
       assertEquals(isEmpty, isEmptyOnServer);
+    } catch (NoSuchMethodError ex) {
+      // expected with old version clients
     } catch (Exception ex) {
       fail("Got unexpected exception when executing isEmptyOnServer", ex);
     }
diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
index 5f0118b..3e6ea3d 100644
--- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
+++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java
@@ -42,6 +42,10 @@
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
     serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
 
+    if (!ServerConnection.allowInternalMessagesWithoutCredentials) {
+      serverConnection.getAuthzRequest();
+    }
+
     int op = clientMessage.getPart(0).getInt();
 
     if (op < 1) {
diff --git a/geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationTwoDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationCQDUnitTest.java
similarity index 90%
rename from geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationTwoDUnitTest.java
rename to geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationCQDUnitTest.java
index f5f686c..83358ea 100644
--- a/geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationTwoDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/security/ClientAuthorizationCQDUnitTest.java
@@ -17,12 +17,19 @@
 import static org.apache.geode.security.SecurityTestUtils.*;
 import static org.apache.geode.test.dunit.IgnoredException.*;
 
+import java.util.Collection;
+import java.util.List;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.geode.cache.operations.OperationContext.OperationCode;
+import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 /**
  * Tests for authorization from client to server. This tests for authorization of all operations
@@ -35,7 +42,24 @@
  * @since GemFire 5.5
  */
 @Category({DistributedTest.class, SecurityTest.class})
-public class ClientAuthorizationTwoDUnitTest extends ClientAuthorizationTestCase {
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class ClientAuthorizationCQDUnitTest extends ClientAuthorizationTestCase {
+  @Parameterized.Parameters
+  public static Collection<String> data() {
+    List<String> result = VersionManager.getInstance().getVersions();
+    if (result.size() < 1) {
+      throw new RuntimeException("No older versions of Geode were found to test against");
+    } else {
+      System.out.println("running against these versions: " + result);
+    }
+    return result;
+  }
+
+  public ClientAuthorizationCQDUnitTest(String version) {
+    super();
+    clientVersion = version;
+  }
 
   @Override
   public final void postSetUpClientAuthorizationTestBase() throws Exception {