''' Created on 06.03.2016 @author: wn ''' import Queue import threading from logger import Logger class BrokerException(Exception): pass class BrokerDuplicateIdException(BrokerException): pass class BrokerNotSubscribedException(BrokerException): pass class BrokerOverflowException(BrokerException): pass class Broker(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.setDaemon(True) self.outQueues = {} self.inQueue = Queue.Queue() def run(self): while True: try: item = self.inQueue.get(True, None) Logger.log("got an item") for (n, q) in self.outQueues.iteritems(): Logger.log("passed to %s" % n) q.put_nowait(item) except Queue.Full: pass def getInQueue(self): return self.inQueue def subscribe(self, name): if self.outQueues.has_key(name): raise BrokerDuplicateIdException() self.outQueues[name] = Queue.Queue() return self.outQueues[name] def unsubscribe(self, name): try: del self.outQueues[name] except KeyError: raise BrokerNotSubscribedException