Write Tasks
Director is a wrapper around Celery, so creating tasks with it is almost the same as creating tasks for pure Celery.
Create a task
In pure Celery you had to create a Celery application object (app = Celery(...)
) and use
the app.task()
decorator to transform Python function into Celery tasks.
This work has already be done for you in Director, so you just have to transform your function
using the director.task
decorator :
# tasks/example.py
from director import task
@task(name="TASK_EXAMPLE")
def my_task(*args, **kwargs):
pass
Warning
The name
parameter in the task decorator is mandatory. Because it will be used in the YAML
file to combine tasks into workflows, this name must be unique.
Task signature
To simplify the tasks creation, and to allow multiple workflows to reuse the same task, the
signature is always the same: (*args, **kwargs)
.
The kwargs dictionary can be used to handle the payload while args contains the results of the task parents (of course args is empty if your task is at the beginning of a workflow).
Technical explanation
In Celery the developer can decide if a task is able to receive or not the result of its parents with 2 methods : s() and si(). The i means immutability and is intended to ignore the parents results. So normally, as a developer, you have to be carefull about the method to use and you also have to create your tasks signatures consequently.
But Director has been created to simplify that! As we decided to receive the results of the
parents in the args
parameter we always use the s()
method.
Here is are some concrete examples based on the following tasks:
# tasks/example.py
from director import task
@task(name="A")
def a(*args, **kwargs):
return {"result": "a_data"}
@task(name="B")
def b(*args, **kwargs):
return {"result": "b_data"}
@task(name="C")
def c(*args, **kwargs):
print(args)
The following workflows present different usecases and the output of the C task (see the Build Workflows guide to understand the YAML format) :
example.NO_PARENT:
tasks:
- C
# Result : (None,)
example.ONE_PARENT:
tasks:
- A
- B
- C
# Result : ({'result': 'b_data'},)
example.MULTIPLE_PARENT:
tasks:
- GROUP_1:
type: group
tasks:
- A
- B
- C
# Result : ([{'result': 'a_data'}, {'result': 'b_data'}],)
Bound Tasks
Celery allows use to bind a task, providing the task instance itself as the first parameter.
In this case the signature will must contain a first parameter just before args
and kwargs
:
# tasks/example.py
from director import task
@task(bind=True, name="BOUND_TASK")
def bound_task(self, *args, **kwargs):
print(self.name)
Celery Task Options
The task()
decorator provided by Director is just a wrapper of the native app.task()
decorator
provided by Celery, so all the original options are still available.
You can for example apply a rate_limit or even configure the max number of retries.