blob: 3913fe3b7d98d5d12ef02f24f39cfab4e5cb6909 [file] [log] [blame]
#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest, threading, re, sys
if sys.version_info < (3,):
range = xrange
import zookeeper, zktestbase
ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}
class ConnectionTest(zktestbase.TestBase):
"""Test whether we can make a connection"""
def setUp(self):
pass
def testconnection(self):
cv = threading.Condition()
self.connected = False
def connection_watcher(handle, type, state, path):
cv.acquire()
self.connected = True
self.assertEqual(zookeeper.CONNECTED_STATE, state)
self.handle = handle
cv.notify()
cv.release()
cv.acquire()
ret = zookeeper.init(self.host, connection_watcher)
cv.wait(15.0)
cv.release()
self.assertEqual(self.connected, True, "Connection timed out to " + self.host)
self.assertEqual(zookeeper.CONNECTED_STATE, zookeeper.state(self.handle))
self.assertEqual(zookeeper.close(self.handle), zookeeper.OK)
# Trying to close the same handle twice is an error, and the C library will segfault on it
# so make sure this is caught at the Python module layer
self.assertRaises(zookeeper.ZooKeeperException,
zookeeper.close,
self.handle)
self.assertRaises(zookeeper.ZooKeeperException,
zookeeper.get,
self.handle,
"/")
def testhandlereuse(self):
"""
Test a) multiple concurrent connections b) reuse of closed handles
"""
cv = threading.Condition()
self.connected = False
def connection_watcher(handle, type, state, path):
cv.acquire()
self.connected = True
self.assertEqual(zookeeper.CONNECTED_STATE, state)
self.handle = handle
cv.notify()
cv.release()
cv.acquire()
handles = [ zookeeper.init(self.host) for i in range(10) ]
ret = zookeeper.init(self.host, connection_watcher)
cv.wait(15.0)
cv.release()
self.assertEqual(self.connected, True, "Connection timed out to " + self.host)
self.assertEqual(True, self.all( [ zookeeper.state(handle) == zookeeper.CONNECTED_STATE for handle in handles ] ),
"Not all connections succeeded")
oldhandle = handles[3]
zookeeper.close(oldhandle)
newhandle = zookeeper.init(self.host)
# This assertion tests *internal* behaviour; i.e. that the module
# correctly reuses closed handles. This is therefore implementation
# dependent.
self.assertEqual(newhandle, oldhandle, "Didn't get reused handle")
def testmanyhandles(self):
"""
Test the ability of the module to support many handles.
"""
# We'd like to do more, but currently the C client doesn't
# work with > 83 handles (fails to create a pipe) on MacOS 10.5.8
handles = [ zookeeper.init(self.host) for i in range(9) ]
cv = threading.Condition()
self.connected = False
def connection_watcher(handle, type, state, path):
cv.acquire()
self.connected = True
self.assertEqual(zookeeper.CONNECTED_STATE, state)
self.handle = handle
cv.notify()
cv.release()
cv.acquire()
ret = zookeeper.init(self.host, connection_watcher)
cv.wait(15.0)
cv.release()
self.assertEqual(self.connected, True, "Connection timed out to " + self.host)
for i,h in enumerate(handles):
path = "/zkpython-test-handles-%s" % str(i)
self.assertEqual(path, zookeeper.create(h, path, "", [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL))
self.assertEqual(True, self.all( zookeeper.close(h) == zookeeper.OK for h in handles ))
def testversionstringexists(self):
self.assertTrue(hasattr(zookeeper, '__version__'))
self.assertTrue(re.match("\d.\d.\d", zookeeper.__version__))
def tearDown(self):
pass
if __name__ == '__main__':
unittest.main()