Source code for experimentor.core.meta

import weakref
from multiprocessing.context import Process
from threading import Thread

from experimentor.lib.log import get_logger


[docs]class MetaProcess(type): """ Meta Class that should be shared by all processes in order to be sure they all switch off nicely when done. """ def __init__(cls, name, bases, attrs): # Create class super(MetaProcess, cls).__init__(name, bases, attrs) # Initialize fresh instance storage cls._instances = weakref.WeakSet() def __call__(cls, *args, **kwargs): # Create instance (calls __init__ and __new__ methods) proc = super(MetaProcess, cls).__call__(*args, **kwargs) # Store weak reference to instance. WeakSet will automatically remove # references to objects that have been garbage collected cls._instances.add(proc) return proc
[docs] def get_instances(cls, recursive=False): """Get all instances of this class in the registry. If recursive=True search subclasses recursively""" instances = list(cls._instances) if recursive: for Child in cls.__subclasses__(): instances += Child.get_instances(recursive=recursive) # Remove duplicates from multiple inheritance. return list(set(instances))
[docs]class ExperimentorProcess(Process, metaclass=MetaProcess): def __init__(self, *args, **kwargs): super(ExperimentorProcess, self).__init__() self.logger = get_logger()
[docs]class ExperimentorThread(Thread, metaclass=MetaProcess): def __init__(self, *args, **kwargs): super(ExperimentorThread, self).__init__(args, kwargs) self.logger = get_logger()