Build Workflows
Director separates the tasks logic from the workflows definition by providing a simple YAML syntax.
Let's imagine the following tasks :
# tasks/example.py
from director import task
@task(name="A")
def a(*args, **kwargs):
pass
@task(name="B")
def b(*args, **kwargs):
pass
@task(name="C")
def c(*args, **kwargs):
pass
Chaining multiple tasks
Chaining these tasks in the workflows.yml
file is pretty simple :
# Chain example
#
# +-------+ +-------+ +-------+
# | A +----->+ B +----->+ C |
# +-------+ +-------+ +-------+
#
example.CHAIN:
tasks:
- A
- B
- C
In this example each task will be executed one after the other : first the task A will be executed, then the task B and finally the task C.
Launch tasks in parallel
Sometimes you need to execute some tasks in parallel to improve your workflow performance.
The type: group
keywords can be used to handle this canvas :
# Group example
# +-------+
# +-->+ B |
# +-------+ | +-------+
# + A +--+
# +-------+ | +-------+
# +-->+ C |
# +-------+
example.GROUP:
tasks:
- A
- GROUP_1:
type: group
tasks:
- B
- C
In this example the group is named GROUP_1 but it can be anything. The important is to keep unique names in case of multiple groups in your workflow.
Periodic workflows
Celery provides a scheduler used to periodically execute some tasks. This scheduler is named the Celery beat.
Director allows you to periodically schedule a whole workflow using a simple YAML syntax.
You can use the periodic > interval
key with a number argument (in seconds):
You can also use the periodic > crontab
key with a string argument:
The format is the following (the official documentation of the crontab
function gives some examples of each attribute):
So in the first example, the example.CHAIN workflow will be executed every 60 seconds and the second one, example.CHAIN_CRONTAB, every three hours.
Warning
Older versions of Celery Director (< 0.4.0) used the periodic > schedule
key which is now deprecated.
It is strongly advised to migrate to periodic > interval
or periodic > crontab
keys to set up a scheduled workflow.
How to migrate:
- If your schedule is in seconds: just rename
schedule
tointerval
; -
If your schedule is a crontab: rename
schedule
tocrontab
.Be careful when migrating from
schedule
tocrontab
, there is also a fix to apply. The oldschedule
syntax incorrectly parses the string asminute hour day_of_week day_of_month month_of_year
but thecrontab
syntax properly parses the string asminute hour day_of_month month_of_year day_of_week
.For example:
0 12 1 * *
should become0 12 * * 1
for a scheduled workflow at 12:00 UTC on Monday.
Please note that the scheduler must be started to handle periodic workflows :
Tip
Celery also accepts the -B
option when launching a worker :
This way you can start your worker and scheduler instances using a single command.
Please note this option is only to use during your development, otherwise use the celery
beat
command.
Use of queues in Workflows
With director, you can set queues for workflows. All workflow's tasks will use the same queue:
You need the start Celery worker instance with the --queues
option: