Hello, Habr! I will tell you the story of my professional burnout.
It so happened that I can not stand the routine of monotonous actions. I have several projects using Celery . Each time a task becomes more complicated than 2 + 2 = 5
, the solution template is reduced to creating a class that performs the task, and a starter function that Celery is able to work with - a boilerplate. In this article I will tell you how I struggled with a boilerplate, and what came of it.
The starting point
Consider the ordinary task of Celery. There is a class that performs the task, and a starter function that instantiates the class and starts one of its methods, in which all the logic of the task is implemented and error handling is inherited:
class MyTask( FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed) @app.task(bind=True) def my_task(self, arg1, arg2): instance = MyTask( celery_task=self, arg1=arg1, arg2=arg2, ) return instance.full_task()
In this case, the full_task
method includes a call to main
, however, it also deals with error handling, logging and other nonsense that is not directly related to the main task.
Taskclass idea
At the root of the taskclass lies a simple idea: in the base class, you can define a method of the task
class, implement the behavior of the starter function in it, and then inherit:
class BaseTask: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task()
All auxiliary boredom collected in the base class. We will not return to her again. We realize the logic of the task:
@app.taskcls(bind=True) class MyTask( BaseTask, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed)
No more husk, much better already. However, what about the entry point?
MyTask.task.delay(...)
MyTask.task
has all the usual task methods: delay
, apply_async
, and, generally speaking, it is.
Now the arguments of the decorator. It is especially fun to drag bind = True
into each task. Is it possible to pass default arguments through a base class?
class BaseTask: class MetaTask: bind = True def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task()
The nested MetaTask
class contains default arguments and will be available to all child classes. Interestingly, it can also be inherited:
class BaseHasTimeout(BaseTask): class MetaTask(BaseTask.MetaTask): timeout = 42
Arguments passed to the @app.taskcls
decorator have the highest priority:
@app.taskcls(timeout=20) class MyTask( BaseHasTimeout, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): ...
As a result, the timeout for the task will be 20.
Going beyond
In web applications, there is often a need to start a task from view. In case of high adhesion, view and task can be combined:
class BaseViewTask: @classmethod def task(cls, **kwargs):
By the way, to avoid collisions of names, the nested class is called MetaTask
, not Meta
, as in django.
Conclusion
This functionality is expected in Celery 4.5 . However, I also prepared a package that allows you to try out the taskcls decorator today. The idea of the package is that when you upgrade Celery to version 4.5, you can remove its import without changing a single line of code.