Source code for experimentor.core.subscriber

"""
    Subscriber
    ==========
    Example script on how to run separate processes to process the data coming from a publisher like the one on
    ``publisher.py``. The first process just grabs the frame and puts it in a Queue. The Queue is then used by
    another process in order to analyse, process, save, etc. It has to be noted that on UNIX systems, getting
    from a queue with ``Queue.get()`` is particularly slow, much slower than serializing a numpy array with
    cPickle.
"""
from threading import Thread
from time import sleep

import numpy as np
import zmq

from experimentor.config import settings
from experimentor.core.meta import MetaProcess
from experimentor.core.pusher import Pusher
from experimentor.lib.log import get_logger

logger = get_logger(__name__)


[docs]class Subscriber(Thread, metaclass=MetaProcess): def __init__(self, func, url, topic): super(Subscriber, self).__init__() logger.info(f'Starting subscriber for {func.__name__} on topic {topic}') self.func = func context = zmq.Context() self.socket = context.socket(zmq.SUB) self.socket.connect(url) self.socket.setsockopt(zmq.SUBSCRIBE, b"") # topic.encode('utf-8')) self.start()
[docs] def run(self): while not settings.GENERAL_STOP_EVENT.is_set(): event = self.socket.poll(0) if not event: sleep(.005) continue topic = self.socket.recv_string() logger.debug(f"Got data on topic {topic}") metadata = self.socket.recv_json(flags=0) if metadata.get('numpy', False): msg = self.socket.recv(flags=0, copy=True, track=False) buf = memoryview(msg) data = np.frombuffer(buf, dtype=metadata['dtype']) data = data.reshape(metadata['shape']).copy() else: data = self.socket.recv_pyobj() # flags=0, copy=True, track=False) if isinstance(data, str): if data == settings.SUBSCRIBER_EXIT_KEYWORD: logger.info(f'Stopping Subscriber {self}') break self.func(data)#, *self.args, **self.kwargs) sleep(1) # Gives enough time for the publishers to finish sending data before closing the socket self.socket.close()
[docs] def stop(self): with Pusher() as pusher: pusher.publish(settings.SUBSCRIBER_EXIT_KEYWORD, self.topic) self.join()
def __str__(self): return f"Subscriber {self.func.__name__}" def __repr__(self): return f"<Subscriber {self.func.__name__}>"