Source code for experimentor.models.decorators
"""
Decorators
==========
Useful decorators for models.
:license: MIT, see LICENSE for more details
:copyright: 2020 Aquiles Carattino
"""
from functools import wraps
from threading import Thread
from experimentor.lib.log import get_logger
from experimentor.models.models import BaseModel
[docs]def not_implemented(func):
""" Raises a warning in the logger in case the method was not implemented by child classes, but it does not prevent
the program from running.
"""
@wraps(func)
def func_wrapper(cls, *args, **kwargs):
logger = get_logger(__name__)
logger.warning(f'{func.__name__} from {cls} not Implemented')
return func(cls, *args, **kwargs)
return func_wrapper
[docs]def make_async_thread(func):
""" Simple decorator to make a method run on a separated thread. This decorator will not work on simple
functions, since it requires the first argument to be an instantiated class (self).
It will store the method in an attribute of the class, called `_threads``, or it will create it if it does not
exist yet.
TODO: Check what happens with the _thread list and inherited classes. Is there a risk that the list will be
shared? If the list is defined as a class attribute instead of an object attribute, most likely it will. If
it is defined outside of the scope and then linked to the class, also.
.. warning:: In complex scenarios, this simple decorator can give raise to mistakes, i.e. objects having access
to other objects threads.
.. TODO: May be wise to use this decorator only with certain models, and define a method directly in them to
manipulate the threads, avoiding the inherent problems of mutable types.
"""
@wraps(func)
def func_wrapper(*args, **kwargs):
logger = get_logger(name=__name__)
logger.info('Starting new thread for {}'.format(func.__name__))
if isinstance(args[0], BaseModel):
args[0].clean_up_threads()
if not hasattr(args[0], '_threads'):
args[0]._threads = []
elif not isinstance(args[0]._threads, list):
raise ValueError('The variable _threads must be a list in order to store a new Thread in it')
args[0]._threads.append([func.__name__, Thread(target=func, args=args, kwargs=kwargs)])
args[0]._threads[-1][1].start()
logger.info('In total there are {} threads'.format(len(args[0]._threads)))
return func_wrapper
[docs]def avoid_repeat(func):
@wraps(func)
def func_wrapper(cls, *args, **kwargs):
logger = get_logger(name=__name__)
logger.info(f'Checking if function {func.__name__} was already triggered')
update = False
if hasattr(cls, 'cache_setters'):
if len(args) >= 1:
for i, arg in enumerate(args[1:]):
old_arg = args[0].cache_setters.get('args')[i]
if arg != old_arg:
update = True
if len(kwargs) > 1:
for key, value in kwargs.items():
old_value = args[0].cache_setters.get('kwargs')[key]
if value != old_value:
update = True
else:
cls.cache_setters = dict()
update = True
if update:
cls.cache_setters.update({'args': args, 'kwargs': kwargs})
logger.info('Updating values')
return func
else:
logger.info('Nothing to update')
return lambda *args, **kwargs: None
return func_wrapper