Source code for experimentor.core.subscriber_process

"""
    Subscriber
    ==========
    Example script on how to run separate processes to process the data coming from a publisher like the one on
    ``publisher.py``. The first process just grabs the frame and puts it in a Queue. The Queue is then used by
    another process in order to analyse, process, save, etc. It has to be noted that on UNIX systems, getting
    from a queue with ``Queue.get()`` is particularly slow, much slower than serializing a numpy array with
    cPickle.

    .. Warning::
        This is work in process. On Windows, since processes are spawned, the subscriber would not work as expected.
        That is why we work with Threads instead.
"""
from multiprocessing import Process
from time import sleep

import numpy as np
import zmq

from experimentor.config import settings
from experimentor.core.meta import ExperimentorProcess
from experimentor.core.pusher import Pusher
from experimentor.lib.log import get_logger


[docs]class Subscriber(ExperimentorProcess): def __init__(self, func, topic, publish_topic=None, args=None, kwargs=None): super(Subscriber, self).__init__() self.func = func self.topic = topic self.publish_topic = publish_topic self.args = args self.kwargs = kwargs self.logger = get_logger() self.logger.info(f'Starting subscriber for {func.__name__} on topic {topic}')
[docs] def run(self): context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(f"tcp://localhost:{settings.PUBLISHER_PUBLISH_PORT}") if self.publish_topic: listener = Pusher() topic_filter = self.topic.encode('utf-8') socket.setsockopt(zmq.SUBSCRIBE, topic_filter) self.logger.info(f'subscriber for {self.func.__name__} on topic {self.topic} ready') while not settings.GENERAL_STOP_EVENT.is_set(): topic = socket.recv_string() self.logger.debug(f"Got data on topic {topic}") metadata = socket.recv_json(flags=0) if metadata.get('numpy', False): msg = socket.recv(flags=0, copy=True, track=False) buf = memoryview(msg) data = np.frombuffer(buf, dtype=metadata['dtype']) data = data.reshape(metadata['shape']).copy() else: data = socket.recv_pyobj() # flags=0, copy=True, track=False) if isinstance(data, str): if data == settings.SUBSCRIBER_EXIT_KEYWORD: self.logger.info(f'Stopping Subscriber {self}') break ans = self.func(data)#, *self.args, **self.kwargs) if self.publish_topic: listener.publish(ans, self.publish_topic) sleep(1) # Gives enough time for the publishers to finish sending data before closing the socket socket.close()
[docs] def stop(self): with Pusher() as pusher: pusher.publish(settings.SUBSCRIBER_EXIT_KEYWORD) self.join()
def __str__(self): return f"Subscriber {self.func.__name__}" def __repr__(self): return f"<Subscriber {self.func.__name__}>"