blob: f21bee31fee0a442c75bef5e7bb83a15540991a2 [file] [log] [blame]
try:
import redis
except ImportError: # pragma: no cover
import sys; sys.exit('please, install redis-py package to use redis-store')
import threading
try:
import cPickle as pickle
except ImportError:
import pickle
from coilmq.store import QueueStore
from coilmq.util.concurrency import synchronized
from coilmq.config import config
__authors__ = ('"Hans Lellelid" <hans@xmpl.org>', '"Alexander Zhukov" <zhukovaa90@gmail.com>')
__copyright__ = "Copyright 2009 Hans Lellelid"
__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
lock = threading.RLock()
def make_redis_store(cfg=None):
return RedisQueueStore(
redis_conn=redis.Redis(**dict((cfg or config).items('redis'))))
class RedisQueueStore(QueueStore):
"""Simple Queue with Redis Backend"""
def __init__(self, redis_conn=None):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db = redis_conn or redis.Redis()
# self.key = '{0}:{1}'.format(namespace, name)
super(RedisQueueStore, self).__init__()
@synchronized(lock)
def enqueue(self, destination, frame):
self.__db.rpush(destination, pickle.dumps(frame))
@synchronized(lock)
def dequeue(self, destination):
item = self.__db.lpop(destination)
if item:
return pickle.loads(item)
@synchronized(lock)
def requeue(self, destination, frame):
self.enqueue(destination, frame)
@synchronized(lock)
def size(self, destination):
return self.__db.llen(destination)
@synchronized(lock)
def has_frames(self, destination):
return self.size(destination) > 0
@synchronized(lock)
def destinations(self):
return self.__db.keys()