blob: d6436be2dc8c65a82be9317167ad8a17c2864267 [file] [log] [blame]
"""
Non-durable topic support functionality.
This code is inspired by the design of the Ruby stompserver project, by
Patrick Hurley and Lionel Bouton. See http://stompserver.rubyforge.org/
"""
import logging
import threading
import uuid
from collections import defaultdict
from coilmq.util.concurrency import synchronized
__authors__ = ['"Hans Lellelid" <hans@xmpl.org>']
__copyright__ = "Copyright 2009 Hans Lellelid"
__license__ = """Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
lock = threading.RLock()
class TopicManager(object):
"""
Class that manages distribution of messages to topic subscribers.
This class uses C{threading.RLock} to guard the public methods. This is probably
a bit excessive, given 1) the actomic nature of basic C{dict} read/write operations
and 2) the fact that most of the internal data structures are keying off of the
STOMP connection, which is going to be thread-isolated. That said, this seems like
the technically correct approach and should increase the chance of this code being
portable to non-GIL systems.
@ivar _topics: A dict of registered topics, keyed by destination.
@type _topics: C{dict} of C{str} to C{set} of L{coilmq.server.StompConnection}
"""
def __init__(self):
self.log = logging.getLogger(
'%s.%s' % (__name__, self.__class__.__name__))
# Lock var is required for L{synchornized} decorator.
self._lock = threading.RLock()
self._topics = defaultdict(set)
# TODO: If we want durable topics, we'll need a store for topics.
@synchronized(lock)
def close(self):
"""
Closes all resources associated with this topic manager.
(Currently this is simply here for API conformity w/ L{coilmq.queue.QueueManager}.)
"""
self.log.info("Shutting down topic manager.") # pragma: no cover
@synchronized(lock)
def subscribe(self, connection, destination):
"""
Subscribes a connection to the specified topic destination.
@param connection: The client connection to subscribe.
@type connection: L{coilmq.server.StompConnection}
@param destination: The topic destination (e.g. '/topic/foo')
@type destination: C{str}
"""
self.log.debug("Subscribing %s to %s" % (connection, destination))
self._topics[destination].add(connection)
@synchronized(lock)
def unsubscribe(self, connection, destination):
"""
Unsubscribes a connection from the specified topic destination.
@param connection: The client connection to unsubscribe.
@type connection: L{coilmq.server.StompConnection}
@param destination: The topic destination (e.g. '/topic/foo')
@type destination: C{str}
"""
self.log.debug("Unsubscribing %s from %s" % (connection, destination))
if connection in self._topics[destination]:
self._topics[destination].remove(connection)
if not self._topics[destination]:
del self._topics[destination]
@synchronized(lock)
def disconnect(self, connection):
"""
Removes a subscriber connection.
@param connection: The client connection to unsubscribe.
@type connection: L{coilmq.server.StompConnection}
"""
self.log.debug("Disconnecting %s" % connection)
for dest in list(self._topics.keys()):
if connection in self._topics[dest]:
self._topics[dest].remove(connection)
if not self._topics[dest]:
# This won't trigger RuntimeError, since we're using keys()
del self._topics[dest]
@synchronized(lock)
def send(self, message):
"""
Sends a message to all subscribers of destination.
@param message: The message frame. (The frame will be modified to set command
to MESSAGE and set a message id.)
@type message: L{stompclient.frame.Frame}
"""
dest = message.headers.get('destination')
if not dest:
raise ValueError(
"Cannot send frame with no destination: %s" % message)
message.cmd = 'message'
message.headers.setdefault('message-id', str(uuid.uuid4()))
bad_subscribers = set()
for subscriber in self._topics[dest]:
try:
subscriber.send_frame(message)
except:
self.log.exception(
"Error delivering message to subscriber %s; client will be disconnected." % subscriber)
# We queue for deletion so we are not modifying the topics dict
# while iterating over it.
bad_subscribers.add(subscriber)
for subscriber in bad_subscribers:
self.disconnect(subscriber)