ZOOKEEPER-3937: C client: avoid out-of-order packets during SASL negotiation

This patch prevents SASL-enabled C clients from sending request packets for as long as the SASL negociation is not complete.

"Ideally," applications should wait for `ZOO_CONNECTED_STATE` or `ZOO_READONLY_STATE` events before issuing requests, but this is not what happens in the wild.

Without this patch, it was easy to cause a desynchronization by creating a handle and blasting requests right away.  (The added test simply avoids doing a `ctx.waitForConnected()`.)

Author: Damien Diederen <dd@crosstwine.com>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Mate Szalay-Beko <symat@apache.org>

Closes #1457 from ztzg/ZOOKEEPER-3937-c-client-out-of-order-packets
diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
index 8497055..ec42e78 100644
--- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c
+++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
@@ -344,6 +344,38 @@
 #endif /* HAVE_CYRUS_SASL_H */
 }
 
+/*
+ * Extract the type field (ZOO_*_OP) of a serialized RequestHeader.
+ *
+ * (This is not the most efficient way of fetching 4 bytes, but it is
+ * currently only used during SASL negotiation.)
+ *
+ * \param buffer the buffer to extract the request type from.  Must
+ *   start with a serialized RequestHeader;
+ * \param len the buffer length.  Must be positive.
+ * \param out_type out parameter; pointer to the location where the
+ *   extracted type is to be stored.  Cannot be NULL.
+ * \return ZOK on success, or < 0 if something went wrong
+ */
+static int extract_request_type(char *buffer, int len, int32_t *out_type)
+{
+    struct iarchive *ia;
+    struct RequestHeader h;
+    int rc;
+
+    ia = create_buffer_iarchive(buffer, len);
+    rc = ia ? ZOK : ZSYSTEMERROR;
+    rc = rc < 0 ? rc : deserialize_RequestHeader(ia, "header", &h);
+    deallocate_RequestHeader(&h);
+    if (ia) {
+        close_buffer_iarchive(&ia);
+    }
+
+    *out_type = h.type;
+
+    return rc;
+}
+
 #ifndef THREADED
 /*
  * abort due to the use of a sync api in a singlethreaded environment
@@ -2812,6 +2844,11 @@
     LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
     zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
     PROCESS_SESSION_EVENT(zh, zh->state);
+
+    if (zh->sasl_client) {
+        /* some packets might have been delayed during SASL negotiaton. */
+        adaptor_send_queue(zh, 0);
+    }
 }
 
 #ifdef HAVE_CYRUS_SASL_H
@@ -4855,6 +4892,20 @@
     // successful
     lock_buffer_list(&zh->to_send);
     while (zh->to_send.head != 0 && (is_connected(zh) || is_sasl_auth_in_progress(zh))) {
+        if (is_sasl_auth_in_progress(zh)) {
+            // We don't let non-SASL packets escape as long as
+            // negotiation is not complete.  (SASL packets are always
+            // pushed to the front of the queue.)
+            buffer_list_t *buff = zh->to_send.head;
+            int32_t type;
+
+            rc = extract_request_type(buff->buffer, buff->len, &type);
+
+            if (rc < 0 || type != ZOO_SASL_OP) {
+                break;
+            }
+        }
+
         if(timeout!=0){
 #ifndef _WIN32
             struct pollfd fds;
@@ -5020,8 +5071,14 @@
     add_last_auth(&zh->auth_h, authinfo);
     zoo_unlock_auth(zh);
 
-    if (is_connected(zh) || zh->state == ZOO_ASSOCIATING_STATE)
+    if (is_connected(zh) ||
+        // When associating, only send info packets if no SASL
+        // negotiation is planned.  (Such packets would be queued in
+        // front of SASL packets, which is forbidden, and SASL
+        // completion is followed by a 'send_auth_info' anyway.)
+        (zh->state == ZOO_ASSOCIATING_STATE && !zh->sasl_client)) {
         return send_last_auth_info(zh);
+    }
 
     return ZOK;
 }
diff --git a/zookeeper-client/zookeeper-client-c/tests/TestSASLAuth.cc b/zookeeper-client/zookeeper-client-c/tests/TestSASLAuth.cc
index 080649d..e6aa4cb 100644
--- a/zookeeper-client/zookeeper-client-c/tests/TestSASLAuth.cc
+++ b/zookeeper-client/zookeeper-client-c/tests/TestSASLAuth.cc
@@ -38,6 +38,7 @@
     CPPUNIT_TEST(testClientSASLOverIPv6);
 #endif/* ZOO_IPV6_ENABLED */
     CPPUNIT_TEST(testClientSASLReadOnly);
+    CPPUNIT_TEST(testClientSASLPacketOrder);
 #endif /* HAVE_CYRUS_SASL_H */
     CPPUNIT_TEST_SUITE_END();
     FILE *logfile;
@@ -227,6 +228,38 @@
         stopServer();
     }
 
+    void testClientSASLPacketOrder() {
+        startServer();
+
+        // Initialize Cyrus SASL.
+        CPPUNIT_ASSERT_EQUAL(sasl_client_init(NULL), SASL_OK);
+
+        // Initialize SASL parameters.
+        zoo_sasl_params_t sasl_params = { 0 };
+
+        sasl_params.service = "zookeeper";
+        sasl_params.host = "zk-sasl-md5";
+        sasl_params.mechlist = "DIGEST-MD5";
+        sasl_params.callbacks = zoo_sasl_make_basic_callbacks(
+            "myuser", NULL, "Zookeeper_SASLAuth.password");
+
+        // Connect.
+        watchctx_t ctx;
+        int rc = 0;
+        zhandle_t *zk = zookeeper_init_sasl(hostPorts, watcher, 10000, NULL,
+            &ctx, /*flags*/0, /*log_callback*/NULL, &sasl_params);
+        ctx.zh = zk;
+        CPPUNIT_ASSERT(zk);
+
+        // No wait: try and queue a packet before SASL auth is complete.
+        char buf[1024];
+        int len = sizeof(buf);
+        rc = zoo_get(zk, "/", 0, buf, &len, 0);
+        CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+
+        stopServer();
+    }
+
 #endif /* HAVE_CYRUS_SASL_H */
 };