Source code for experimentor.core.pusher
"""
Pusher
======
.. versionadded:: 0.2.0
Half the ZMQ implementation is abut broadcasting information from a publisher to different subscribers. However, the
other half is giving information to the publisher to broadcast. We are doing this with a PUSH/PULL pattern. The
pusher is therefore able to send information to the Publisher to then broadcast. There can be many instances of
pushers, but only one publisher. In other words, this is a fan-in type of architecture.
"""
import atexit
from threading import RLock
from time import sleep
import numpy as np
import zmq
from experimentor.config import settings
from experimentor.lib.log import get_logger
logger = get_logger(__name__)
[docs]class Pusher:
"""
The Pusher is class that wraps some common methods of the ZMQ PUSH/PULL architecture.
.. warning:: The main problem with this pattern is that if there is not PULL on the other side, a queue will build
up on the PUSH side. This happens if, for example, we close the publisher but we keep generating data.
Eventually the queue will outgrow the memory and the computer will crash.
Parameters
----------
port: int
The port on which to connect the PUSH end. If not specified, it will grab the default value from settings
Attributes
----------
pusher: socket
The socket where the communication happens
i: int
The number of messages that were pushed from a given
topic_i: dict
Number of data frames sent on each topic. For example: topic_i['topic']
lock: RLOCK
In case the same pusher is shared between different threads, this ensures the messages are sent in the proper
block
"""
def __init__(self, port=None):
self.lock = RLock()
context = zmq.Context()
self.pusher = context.socket(zmq.PUSH)
self.pusher.connect(f"tcp://127.0.0.1:{port or settings.PUBLISHER_PULL_PORT}")
sleep(1)
self.i = 0
self.topic_i = {}
atexit.register(self.finish)
[docs] def publish(self, data, topic=""):
"""Publish data on a given topic. This is the core of the Pusher object.
Parameters
----------
data
Data can be any Python object, provided that it is serializable
topic: str
The topic on which the data is being transmitted. If nothing is specified, it will be a broad transmission,
meaning that every subscriber will receive it.
"""
with self.lock:
if topic in self.topic_i:
self.topic_i[topic] += 1
else:
self.topic_i.update({topic: 1})
if settings.PUBLISHER_READY:
self.pusher.send_string(topic, zmq.SNDMORE)
if isinstance(data, np.ndarray):
meta_data = dict(
numpy=True,
dtype=str(data.dtype),
shape=data.shape,
i=self.topic_i[topic]
)
self.pusher.send_json(meta_data, 0 | zmq.SNDMORE)
self.pusher.send(data, 0, copy=True, track=False)
else:
meta_data = dict(
numpy=False
)
self.pusher.send_json(meta_data, 0 | zmq.SNDMORE )
self.pusher.send_pyobj(data)
self.i += 1
[docs] def finish(self):
with self.lock:
logger.info('Finishing Pusher')
self.pusher.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.finish()
def __enter__(self):
return self