/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <cppunit/extensions/HelperMacros.h>
#include "CppAssertHelper.h"

#include "ZKMocks.h"
#include <proto.h>

using namespace std;

class Zookeeper_operations : public CPPUNIT_NS::TestFixture
{
    CPPUNIT_TEST_SUITE(Zookeeper_operations);
#ifndef THREADED
    CPPUNIT_TEST(testPing);
    CPPUNIT_TEST(testUnsolicitedPing);
    CPPUNIT_TEST(testTimeoutCausedByWatches1);
    CPPUNIT_TEST(testTimeoutCausedByWatches2);
    CPPUNIT_TEST(testCloseWhileInProgressFromMain);
    CPPUNIT_TEST(testCloseWhileInProgressFromCompletion);
    CPPUNIT_TEST(testCloseWhileMultiInProgressFromMain);
    CPPUNIT_TEST(testCloseWhileMultiInProgressFromCompletion);
#else    
    CPPUNIT_TEST(testAsyncWatcher1);
    CPPUNIT_TEST(testAsyncGetOperation);
#endif
    CPPUNIT_TEST(testOperationsAndDisconnectConcurrently1);
    CPPUNIT_TEST(testOperationsAndDisconnectConcurrently2);
    CPPUNIT_TEST(testConcurrentOperations1);
    CPPUNIT_TEST_SUITE_END();
    zhandle_t *zh;
    FILE *logfile;

    static void watcher(zhandle_t *, int, int, const char *,void*){}
public: 
    Zookeeper_operations() {
      logfile = openlogfile("Zookeeper_operations");
    }

    ~Zookeeper_operations() {
      if (logfile) {
        fflush(logfile);
        fclose(logfile);
        logfile = 0;
      }
    }

    void setUp()
    {
        zoo_set_log_stream(logfile);

        zoo_deterministic_conn_order(0);
        zh=0;
    }
    
    void tearDown()
    {
        zookeeper_close(zh);
    }

    class AsyncGetOperationCompletion: public AsyncCompletion{
    public:
        AsyncGetOperationCompletion():called_(false),rc_(ZAPIERROR){}
        virtual void dataCompl(int rc, const char *value, int len, const Stat *stat){
            synchronized(mx_);
            called_=true;
            rc_=rc;
            value_.erase();
            if(rc!=ZOK) return;
            value_.assign(value,len);
            if(stat)
                stat_=*stat;
        }
        bool operator()()const{
            synchronized(mx_);
            return called_;
        }
        mutable Mutex mx_;
        bool called_;
        int rc_;
        string value_;
        NodeStat stat_;
    };

    class AsyncVoidOperationCompletion: public AsyncCompletion{
    public:
        AsyncVoidOperationCompletion():called_(false),rc_(ZAPIERROR){}
        virtual void voidCompl(int rc){
            synchronized(mx_);
            called_=true;
            rc_=rc;
        }
        bool operator()()const{
            synchronized(mx_);
            return called_;
        }
        mutable Mutex mx_;
        bool called_;
        int rc_;
    };
#ifndef THREADED
    // send two get data requests; verify that the corresponding completions called
    void testConcurrentOperations1()
    {
        Mock_gettimeofday timeMock;
        ZookeeperServer zkServer;
        // must call zookeeper_close() while all the mocks are in scope
        CloseFinally guard(&zh);
        
        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        // simulate connected state
        forceConnected(zh);
        
        int fd=0;
        int interest=0;
        timeval tv;
        // first operation
        AsyncGetOperationCompletion res1;
        zkServer.addOperationResponse(new ZooGetResponse("1",1));
        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // second operation
        AsyncGetOperationCompletion res2;
        zkServer.addOperationResponse(new ZooGetResponse("2",1));
        rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // process the send queue
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        while((rc=zookeeper_process(zh,interest))==ZOK) {
          millisleep(100);
          //printf("%d\n", rc);
        }
        //printf("RC = %d", rc);
        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);

        CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
        CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,res2.rc_);
        CPPUNIT_ASSERT_EQUAL(string("2"),res2.value_);
    }
    // send two getData requests and disconnect while the second request is
    // outstanding;
    // verify the completions are called
    void testOperationsAndDisconnectConcurrently1()
    {
        Mock_gettimeofday timeMock;
        ZookeeperServer zkServer;
        // must call zookeeper_close() while all the mocks are in scope
        CloseFinally guard(&zh);
        
        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        // simulate connected state
        forceConnected(zh);
        
        int fd=0;
        int interest=0;
        timeval tv;
        // first operation
        AsyncGetOperationCompletion res1;
        zkServer.addOperationResponse(new ZooGetResponse("1",1));
        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // second operation
        AsyncGetOperationCompletion res2;
        zkServer.addOperationResponse(new ZooGetResponse("2",1));
        rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // process the send queue
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // simulate a disconnect
        zkServer.setConnectionLost();
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
        CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
        CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,res2.rc_);
        CPPUNIT_ASSERT_EQUAL(string(""),res2.value_);
    }
    // send two getData requests and simulate timeout while the both request
    // are pending;
    // verify the completions are called
    void testOperationsAndDisconnectConcurrently2()
    {
        Mock_gettimeofday timeMock;
        ZookeeperServer zkServer;
        // must call zookeeper_close() while all the mocks are in scope
        CloseFinally guard(&zh);
        
        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        // simulate connected state
        forceConnected(zh);
        
        int fd=0;
        int interest=0;
        timeval tv;
        // first operation
        AsyncGetOperationCompletion res1;
        zkServer.addOperationResponse(new ZooGetResponse("1",1));
        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // second operation
        AsyncGetOperationCompletion res2;
        zkServer.addOperationResponse(new ZooGetResponse("2",1));
        rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // simulate timeout
        timeMock.tick(+10); // advance system time by 10 secs
        // the next call to zookeeper_interest should return ZOPERATIONTIMEOUT
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,rc);
        // make sure the completions have been called
        CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res1.rc_);
        CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res2.rc_);
    }

    class PingCountingServer: public ZookeeperServer{
    public:
        PingCountingServer():pingCount_(0){}
        // called when a client request is received
        virtual void onMessageReceived(const RequestHeader& rh, iarchive* ia){
           if(rh.type==ZOO_PING_OP){
               pingCount_++;
           }
        }
        int pingCount_;
    };

    // establish a connection; idle for a while
    // verify ping was sent at least once
    void testPing()
    {
        const int TIMEOUT=9; // timeout in secs
        Mock_gettimeofday timeMock;
        PingCountingServer zkServer;
        // must call zookeeper_close() while all the mocks are in scope
        CloseFinally guard(&zh);
        
        // receive timeout is in milliseconds
        zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        // simulate connected state
        forceConnected(zh);
        
        int fd=0;
        int interest=0;
        timeval tv;
        // Round 1.
        int rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // simulate waiting for the select() call to timeout; 
        // advance the system clock accordingly
        timeMock.tick(tv);  
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
        // verify no ping sent
        CPPUNIT_ASSERT(zkServer.pingCount_==0);
        
        // Round 2.
        // the client should have the idle threshold exceeded, by now
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // assume the socket is writable, so no idling here; move on to 
        // zookeeper_process immediately
        rc=zookeeper_process(zh,interest);
        // ZNOTHING means the client hasn't received a ping response yet
        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
        // verify a ping is sent
        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
        
        // Round 3.
        // we're going to receive a server PING response and make sure
        // that the client has updated its last_recv timestamp 
        zkServer.addRecvResponse(new PingResponse);
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // pseudo-sleep for a short while (10 ms)
        timeMock.millitick(10);
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // only one ping so far?
        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
        CPPUNIT_ASSERT(timeMock==zh->last_recv);

        // Round 4
        // make sure that a ping is not sent if something is outstanding
        AsyncGetOperationCompletion res1;
        rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        timeMock.tick(tv);  
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // pseudo-sleep for a short while (10 ms)
        timeMock.millitick(10);
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
        // only one ping so far?
        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
    }

    // ZOOKEEPER-2253: Permit unsolicited pings
    void testUnsolicitedPing()
    {
        const int TIMEOUT=9; // timeout in secs
        Mock_gettimeofday timeMock;
        PingCountingServer zkServer;
        // must call zookeeper_close() while all the mocks are in scope
        CloseFinally guard(&zh);

        // receive timeout is in milliseconds
        zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        // simulate connected state
        forceConnected(zh);

        int fd=0;
        int interest=0;
        timeval tv;

        int rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);

        // verify no ping sent
        CPPUNIT_ASSERT(zkServer.pingCount_==0);

        // we're going to receive a unsolicited PING response; ensure
        // that the client has updated its last_recv timestamp
        timeMock.tick(tv);
        zkServer.addRecvResponse(new PingResponse);
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        CPPUNIT_ASSERT(timeMock==zh->last_recv);
    }

    // simulate a watch arriving right before a ping is due
    // assert the ping is sent nevertheless
    void testTimeoutCausedByWatches1()
    {
        const int TIMEOUT=9; // timeout in secs
        Mock_gettimeofday timeMock;
        PingCountingServer zkServer;
        // must call zookeeper_close() while all the mocks are in scope
        CloseFinally guard(&zh);
        
        // receive timeout is in milliseconds
        zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        // simulate connected state
        forceConnected(zh);
        
        int fd=0;
        int interest=0;
        timeval tv;
        // Round 1.
        int rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // simulate waiting for the select() call to timeout; 
        // advance the system clock accordingly
        timeMock.tick(tv);
        timeMock.tick(-1); // set the clock to a millisecond before a ping is due
        // trigger a watch now
        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // arrival of a watch sets the last_recv to the current time
        CPPUNIT_ASSERT(timeMock==zh->last_recv);
        // spend 1 millisecond by processing the watch
        timeMock.tick(1);
        
        // Round 2.
        // a ping is due; zookeeper_interest() must send it now
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // no delay here -- as if the socket is immediately writable
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
        // verify a ping is sent
        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);        
    }

    // similar to testTimeoutCausedByWatches1, but this time the watch is 
    // triggered while the client has an outstanding request
    // assert the ping is sent on time
    void testTimeoutCausedByWatches2()
    {
        const int TIMEOUT=9; // timeout in secs
        Mock_gettimeofday now;
        PingCountingServer zkServer;
        // must call zookeeper_close() while all the mocks are in scope
        CloseFinally guard(&zh);
        
        // receive timeout is in milliseconds
        zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        // simulate connected state
        forceConnected(zh);
        
        // queue up a request; keep it pending (as if the server is busy or has died)
        AsyncGetOperationCompletion res1;
        zkServer.addOperationResponse(new ZooGetResponse("2",1));
        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);

        int fd=0;
        int interest=0;
        timeval tv;
        // Round 1.
        // send the queued up zoo_aget() request
        Mock_gettimeofday beginningOfTimes(now); // remember when we started
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // no delay -- the socket is writable
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc); 
        
        // Round 2.
        // what's next?
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        // no response from the server yet -- waiting in the select() call
        now.tick(tv);
        // a watch has arrived, thus preventing the connection from timing out 
        zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));        
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc); // read the watch message
        CPPUNIT_ASSERT_EQUAL(0,zkServer.pingCount_); // not yet!
        
        //Round 3.
        // now is the time to send a ping; make sure it's actually sent
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        rc=zookeeper_process(zh,interest);
        CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
        // verify a ping is sent
        CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
        // make sure only 1/3 of the timeout has passed
        CPPUNIT_ASSERT_EQUAL((int32_t)TIMEOUT/3*1000,toMilliseconds(now-beginningOfTimes));
    }

    // ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
    // while there is a request waiting for being processed
    // call zookeeper_close() from the main event loop
    // assert the completion callback is called
    void testCloseWhileInProgressFromMain()
    {
        Mock_gettimeofday timeMock;
        ZookeeperServer zkServer;
        CloseFinally guard(&zh);

        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        forceConnected(zh);
        zhandle_t* savezh=zh;

        // issue a request
        zkServer.addOperationResponse(new ZooGetResponse("1",1));
        AsyncGetOperationCompletion res1;
        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);

        // but do not allow Zookeeper C Client to process the request
        // and call zookeeper_close() from the main event loop immediately
        Mock_free_noop freeMock;
        rc=zookeeper_close(zh); zh=0;
        freeMock.disable();
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);

        // verify that memory for completions was freed (would be freed if no mock installed)
        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
        CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
        CPPUNIT_ASSERT(savezh->completions_to_process.last==0);

        // verify that completion was called, and it was called with ZCLOSING status
        CPPUNIT_ASSERT(res1.called_);
        CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res1.rc_);
    }

    // ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
    // send some request #1
    // then, while there is a request #2 waiting for being processed
    // call zookeeper_close() from the completion callback of request #1
    // assert the completion callback #2 is called
    void testCloseWhileInProgressFromCompletion()
    {
        Mock_gettimeofday timeMock;
        ZookeeperServer zkServer;
        CloseFinally guard(&zh);

        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        forceConnected(zh);
        zhandle_t* savezh=zh;

        // will handle completion on request #1 and issue request #2 from it
        class AsyncGetOperationCompletion1: public AsyncCompletion{
        public:
            AsyncGetOperationCompletion1(zhandle_t **zh, ZookeeperServer *zkServer, 
                    AsyncGetOperationCompletion *res2)
            :zh_(zh),zkServer_(zkServer),res2_(res2){}

            virtual void dataCompl(int rc1, const char *value, int len, const Stat *stat){
                CPPUNIT_ASSERT_EQUAL((int)ZOK,rc1);

                // from the completion #1 handler, issue request #2
                zkServer_->addOperationResponse(new ZooGetResponse("2",1));
                int rc2=zoo_aget(*zh_,"/x/y/2",0,asyncCompletion,res2_);
                CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);

                // but do not allow Zookeeper C Client to process the request #2
                // and call zookeeper_close() from the completion callback of request #1
                rc2=zookeeper_close(*zh_); *zh_=0;
                CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);

                // do not disable freeMock here, let completion #2 handler
                // return through ZooKeeper C Client internals to the main loop
                // and fulfill the work
            }

            zhandle_t **zh_;
            ZookeeperServer *zkServer_;
            AsyncGetOperationCompletion *res2_;
        };

        // issue request #1
        AsyncGetOperationCompletion res2;
        AsyncGetOperationCompletion1 res1(&zh,&zkServer,&res2);
        zkServer.addOperationResponse(new ZooGetResponse("1",1));
        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);

        // process the send queue
        int fd; int interest; timeval tv;
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        CPPUNIT_ASSERT(zh!=0);
        Mock_free_noop freeMock;
        while(zh!=0 && (rc=zookeeper_process(zh,interest))==ZOK) {
          millisleep(100);
        }
        freeMock.disable();
        CPPUNIT_ASSERT(zh==0);

        // verify that memory for completions was freed (would be freed if no mock installed)
        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
        CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
        CPPUNIT_ASSERT(savezh->completions_to_process.last==0);

        // verify that completion #2 was called, and it was called with ZCLOSING status
        CPPUNIT_ASSERT(res2.called_);
        CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res2.rc_);
    }

    // ZOOKEEPER-2891: Invalid processing of zookeeper_close for mutli-request
    // while there is a multi request waiting for being processed
    // call zookeeper_close() from the main event loop
    // assert the completion callback is called with status ZCLOSING
    void testCloseWhileMultiInProgressFromMain()
    {
        Mock_gettimeofday timeMock;
        ZookeeperServer zkServer;
        CloseFinally guard(&zh);

        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        forceConnected(zh);
        zhandle_t* savezh=zh;

        // issue a multi request
        int nops=2;
        zoo_op_t ops[nops];
        zoo_op_result_t results[nops];
        zoo_create_op_init(&ops[0],"/a",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
        zoo_create_op_init(&ops[1],"/a/b",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
        // TODO: Provide ZooMultiResponse. However, it's not required in this test.
        // zkServer.addOperationResponse(new ZooMultiResponse(...));
        AsyncVoidOperationCompletion res1;
        int rc=zoo_amulti(zh,nops,ops,results,asyncCompletion,&res1);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);

        // but do not allow Zookeeper C Client to process the request
        // and call zookeeper_close() from the main event loop immediately
        Mock_free_noop freeMock;
        rc=zookeeper_close(zh); zh=0;
        freeMock.disable();
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);

        // verify that memory for completions was freed (would be freed if no mock installed)
        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
        CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
        CPPUNIT_ASSERT(savezh->completions_to_process.last==0);

        // verify that completion was called, and it was called with ZCLOSING status
        CPPUNIT_ASSERT(res1.called_);
        CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res1.rc_);
    }

    // ZOOKEEPER-2891: Invalid processing of zookeeper_close for mutli-request
    // send some request #1 (not a multi request)
    // then, while there is a multi request #2 waiting for being processed
    // call zookeeper_close() from the completion callback of request #1
    // assert the completion callback #2 is called with status ZCLOSING
    void testCloseWhileMultiInProgressFromCompletion()
    {
        Mock_gettimeofday timeMock;
        ZookeeperServer zkServer;
        CloseFinally guard(&zh);

        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        forceConnected(zh);
        zhandle_t* savezh=zh;

        // these shall persist during the test
        int nops=2;
        zoo_op_t ops[nops];
        zoo_op_result_t results[nops];

        // will handle completion on request #1 and issue request #2 from it
        class AsyncGetOperationCompletion1: public AsyncCompletion{
        public:
            AsyncGetOperationCompletion1(zhandle_t **zh, ZookeeperServer *zkServer,
                    AsyncVoidOperationCompletion *res2,
                    int nops, zoo_op_t* ops, zoo_op_result_t* results)
            :zh_(zh),zkServer_(zkServer),res2_(res2),nops_(nops),ops_(ops),results_(results){}

            virtual void dataCompl(int rc1, const char *value, int len, const Stat *stat){
                CPPUNIT_ASSERT_EQUAL((int)ZOK,rc1);

                // from the completion #1 handler, issue multi request #2
                assert(nops_>=2);
                zoo_create_op_init(&ops_[0],"/a",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
                zoo_create_op_init(&ops_[1],"/a/b",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
                // TODO: Provide ZooMultiResponse. However, it's not required in this test.
                // zkServer_->addOperationResponse(new ZooMultiResponse(...));
                int rc2=zoo_amulti(*zh_,nops_,ops_,results_,asyncCompletion,res2_);
                CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);

                // but do not allow Zookeeper C Client to process the request #2
                // and call zookeeper_close() from the completion callback of request #1
                rc2=zookeeper_close(*zh_); *zh_=0;
                CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);

                // do not disable freeMock here, let completion #2 handler
                // return through ZooKeeper C Client internals to the main loop
                // and fulfill the work
            }

            zhandle_t **zh_;
            ZookeeperServer *zkServer_;
            AsyncVoidOperationCompletion *res2_;
            int nops_;
            zoo_op_t* ops_;
            zoo_op_result_t* results_;
        };

        // issue some request #1 (not a multi request)
        AsyncVoidOperationCompletion res2;
        AsyncGetOperationCompletion1 res1(&zh,&zkServer,&res2,nops,ops,results);
        zkServer.addOperationResponse(new ZooGetResponse("1",1));
        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);

        // process the send queue
        int fd; int interest; timeval tv;
        rc=zookeeper_interest(zh,&fd,&interest,&tv);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        CPPUNIT_ASSERT(zh!=0);
        Mock_free_noop freeMock;
        while(zh!=0 && (rc=zookeeper_process(zh,interest))==ZOK) {
          millisleep(100);
        }
        freeMock.disable();
        CPPUNIT_ASSERT(zh==0);

        // verify that memory for completions was freed (would be freed if no mock installed)
        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
        CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
        CPPUNIT_ASSERT(savezh->completions_to_process.last==0);

        // verify that completion #2 was called, and it was called with ZCLOSING status
        CPPUNIT_ASSERT(res2.called_);
        CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res2.rc_);
    }

#else   
    class TestGetDataJob: public TestJob{
    public:
        TestGetDataJob(ZookeeperServer* svr,zhandle_t* zh, int reps=500)
            :svr_(svr),zh_(zh),rc_(ZAPIERROR),reps_(reps){}
        virtual void run(){
            int i;
            for(i=0;i<reps_;i++){
                char buf;
                int size=sizeof(buf);

                if (i % 10 == 0) {
                    // We need to pause every once in a while so we don't
                    // get too far ahead and finish before the disconnect
	            millisleep(1);
                }
                svr_->addOperationResponse(new ZooGetResponse("1",1));
                rc_=zoo_get(zh_,"/x/y/z",0,&buf,&size,0);
                if(rc_!=ZOK){
                    break;
                }
            }
        }
        ZookeeperServer* svr_;
        zhandle_t* zh_;
        int rc_;
        int reps_;
    };
    class TestConcurrentOpJob: public TestGetDataJob{
    public:
        static const int REPS=500;
        TestConcurrentOpJob(ZookeeperServer* svr,zhandle_t* zh):
            TestGetDataJob(svr,zh,REPS){}
        virtual TestJob* clone() const {
            return new TestConcurrentOpJob(svr_,zh_);
        }
        virtual void validate(const char* file, int line) const{
            CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
        }
    };
    void testConcurrentOperations1()
    {
        for(int counter=0; counter<50; counter++){
            // frozen time -- no timeouts and no pings
            Mock_gettimeofday timeMock;
            
            ZookeeperServer zkServer;
            Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
            // must call zookeeper_close() while all the mocks are in the scope!
            CloseFinally guard(&zh);
            
            zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
            CPPUNIT_ASSERT(zh!=0);
            // make sure the client has connected
            CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
            
            TestJobManager jmgr(TestConcurrentOpJob(&zkServer,zh),10);
            jmgr.startAllJobs();
            jmgr.wait();
            // validate test results
            VALIDATE_JOBS(jmgr);
        }
    }
    class ZKGetJob: public TestJob{
    public:
        static const int REPS=1000;
        ZKGetJob(zhandle_t* zh)
            :zh_(zh),rc_(ZAPIERROR){}
        virtual TestJob* clone() const {
            return new ZKGetJob(zh_);
        }
        virtual void run(){
            int i;
            for(i=0;i<REPS;i++){
                char buf;
                int size=sizeof(buf);                
                rc_=zoo_get(zh_,"/xyz",0,&buf,&size,0);
                if(rc_!=ZOK){
                    break;
                }
            }
            //TEST_TRACE(("Finished %d iterations",i));
        }
        virtual void validate(const char* file, int line) const{
            CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
        }
        zhandle_t* zh_;
        int rc_;
    };

    // this test connects to a real ZK server and creates the /xyz node and sends
    // lots of zoo_get requests.
    // to run this test use the following command:
    // zktest-mt Zookeeper_operations::testOperationsAndDisconnectConcurrently2 localhost:3181
    // where the second parameter is the server host and port
    void testOperationsAndDisconnectConcurrently2()
    {
        if(globalTestConfig.getTestName().find(__func__)==string::npos || 
                globalTestConfig.getExtraOptCount()==0)
        {
            // only run this test when specifically asked so
            return;
        }
        string host(*(globalTestConfig.getExtraOptBegin()));
        zhandle_t* lzh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
        CPPUNIT_ASSERT(lzh!=0);
        // make sure the client has connected
        CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
                ensureCondition(ClientConnected(zh),5000)<5000);
        
        char realpath[1024];
        int rc=zoo_create(lzh,"/xyz","1",1,&ZOO_OPEN_ACL_UNSAFE,0,realpath,sizeof(realpath)-1);
        CPPUNIT_ASSERT(rc==ZOK || rc==ZNODEEXISTS);
        zookeeper_close(lzh); 
  
        for(int counter=0; counter<200; counter++){
            TEST_TRACE(("Loop count %d",counter));
            
            CloseFinally guard(&zh);

            zh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
            CPPUNIT_ASSERT(zh!=0);
            // make sure the client has connected
            CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
                    ensureCondition(ClientConnected(zh),5000)<5000);
            
            TestJobManager jmgr(ZKGetJob(zh),10);
            jmgr.startJobsImmediately();
            jmgr.wait();
            VALIDATE_JOBS(jmgr);
            TEST_TRACE(("run %d finished",counter));
        }

    }

    class TestConcurrentOpWithDisconnectJob: public TestGetDataJob{
    public:
        static const int REPS=1000;
        TestConcurrentOpWithDisconnectJob(ZookeeperServer* svr,zhandle_t* zh):
            TestGetDataJob(svr,zh,REPS){}
        virtual TestJob* clone() const {
            return new TestConcurrentOpWithDisconnectJob(svr_,zh_);
        }
        virtual void validate(const char* file, int line) const{
            CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZCONNECTIONLOSS != rc",(int)ZCONNECTIONLOSS,rc_,file,line);
        }
    };

    // this test is not 100% accurate in a sense it may not detect all error cases.
    // TODO: I can't think of a test that is 100% accurate and doesn't interfere
    //       with the code being tested (in terms of introducing additional 
    //       implicit synchronization points)
    void testOperationsAndDisconnectConcurrently1()
    {
        for(int counter=0; counter<50; counter++){
            //TEST_TRACE(("Loop count %d",counter));
            // frozen time -- no timeouts and no pings
            Mock_gettimeofday timeMock;
            
            ZookeeperServer zkServer;
            Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
            // must call zookeeper_close() while all the mocks are in the scope!
            CloseFinally guard(&zh);
            
            zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
            CPPUNIT_ASSERT(zh!=0);
            // make sure the client has connected
            CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
            
            TestJobManager jmgr(TestConcurrentOpWithDisconnectJob(&zkServer,zh),10);
            jmgr.startJobsImmediately();
            // let everything startup before we shutdown the server
            millisleep(4);
            // reconnect attempts will start failing immediately 
            zkServer.setServerDown(0);
            // next recv call will return 0
            zkServer.setConnectionLost();
            jmgr.wait();
            VALIDATE_JOBS(jmgr);
        }
        
    }
    // call zoo_aget() in the multithreaded mode
    void testAsyncGetOperation()
    {
        Mock_gettimeofday timeMock;
        
        ZookeeperServer zkServer;
        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
        // must call zookeeper_close() while all the mocks are in the scope!
        CloseFinally guard(&zh);
        
        zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
        CPPUNIT_ASSERT(zh!=0);
        // make sure the client has connected
        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);

        AsyncGetOperationCompletion res1;
        zkServer.addOperationResponse(new ZooGetResponse("1",1));
        int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        
        CPPUNIT_ASSERT(ensureCondition(res1,1000)<1000);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
        CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);        
    }
    class ChangeNodeWatcher: public WatcherAction{
    public:
        ChangeNodeWatcher():changed_(false){}
        virtual void onNodeValueChanged(zhandle_t*,const char* path){
            synchronized(mx_);
            changed_=true;
            if(path!=0) path_=path;
        }
        // this predicate checks if CHANGE_EVENT event type was triggered, unlike
        // the isWatcherTriggered() that returns true whenever a watcher is triggered
        // regardless of the event type
        SyncedBoolCondition isNodeChangedTriggered() const{
            return SyncedBoolCondition(changed_,mx_);
        }
        bool changed_;
        string path_;
    };
    
    class AsyncWatcherCompletion: public AsyncCompletion{
    public:
        AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){}
        virtual void statCompl(int rc, const Stat *stat){
            // we received a server response, now enqueue a watcher event
            // to trigger the watcher
            zkServer_.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
        }
        ZookeeperServer& zkServer_;
    };
    // verify that async watcher is called for znode events (CREATED, DELETED etc.)
    void testAsyncWatcher1(){
        Mock_gettimeofday timeMock;
        
        ZookeeperServer zkServer;
        Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
        // must call zookeeper_close() while all the mocks are in the scope!
        CloseFinally guard(&zh);
        
        ChangeNodeWatcher action;        
        zh=zookeeper_init("localhost:2121",activeWatcher,10000,
                TEST_CLIENT_ID,&action,0);
        CPPUNIT_ASSERT(zh!=0);
        // make sure the client has connected
        CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
        
        // set the watcher
        AsyncWatcherCompletion completion(zkServer);
        // prepare a response for the zoo_aexists() request
        zkServer.addOperationResponse(new ZooStatResponse);
        int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion);
        CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
        
        CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000);
        CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_);                
    }
#endif
};

CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_operations);
