Task Rules¶
A Task/Operator usually has some rules which describe when and how it should take action. A Task Rule consists of three parts:
Event - it specifies the signal that triggers the invocation of the rule
Condition - it is a logical test that, if satisfied or evaluates to be true, causes the action to be carried out
Action - START, STOP or RESTART the task
In a Workflow, those Tasks that do not have rules that Action is START will be executed as long as the Workflow starts.
During the execution of those Tasks that run first, some Events would be generated to trigger the other Tasks to run.
Next, we will go deep into some types of Rules to help thoroughly understand them.
Status Rules¶
The most common Task Rule is that one task runs after the other tasks succeed, users can add such Rules by calling start_after API.
from ai_flow.model.workflow import Workflow
from ai_flow.operators.bash import BashOperator
with Workflow(name='my_workflow') as workflow:
task1 = BashOperator(name='task_1',
bash_command='echo I am the 1st task')
task2 = BashOperator(name='task_2',
bash_command='echo I am the 2nd task')
task2.start_after([task1, ])
task1 has no Rules that Action is START so it would execute first, and task2 will start running after task1 succeed.
More generally, a task may perform other actions after more than one task is finished with any status, users can add such Rules by calling action_on_task_status API.
from ai_flow.model.action import TaskAction
from ai_flow.model.status import TaskStatus
from ai_flow.model.workflow import Workflow
from ai_flow.operators.bash import BashOperator
with Workflow(name='my_workflow') as workflow:
task1 = BashOperator(name='task_1',
bash_command='sleep 10')
task2 = BashOperator(name='task_2',
bash_command='sleep 20')
task3 = BashOperator(name='task_3',
bash_command='sleep 100')
task3.action_on_task_status(action=TaskAction.STOP,
upstream_task_status_dict={
task1: TaskStatus.SUCCESS,
task2: TaskStatus.SUCCESS
})
Since all 3 tasks have no Rules that Action is START so they will execute once the workflow starts, but task3 won’t finish after 100 seconds, instead it will be stopped when both task1 and task2 succeed.
Single Event Rules¶
Another commonly used Task Rule is that one task takes actions after receiving an event, users can add such Rules by calling action_on_event_received API.
import time
from ai_flow.model.action import TaskAction
from ai_flow.model.workflow import Workflow
from ai_flow.notification.notification_client import AIFlowNotificationClient
from ai_flow.operators.bash import BashOperator
from ai_flow.operators.python import PythonOperator
def func():
time.sleep(5)
print('I am the 1st task')
notification_client = AIFlowNotificationClient("localhost:50052")
notification_client.send_event(key="key",
value="")
with Workflow(name='quickstart_workflow') as workflow:
task1 = PythonOperator(name='task1', python_callable=func)
task2 = BashOperator(name='task2', bash_command='echo I am the 2nd task.')
task2.action_on_event_received(action=TaskAction.START, event_key="key")
task1 would send a custom event that triggers task2 to start running.
Custom Rules¶
Sometimes users may want to take action on tasks only when they receive multiple events or satisfy more complex conditions.
In those scenarios, users can add custom Task Rules by calling action_on_condition API, e.g. in the below example, task1 sends an event with a number and task2 would be triggered when the number adds up to 100.
import random
import time
from notification_service.model.event import Event
from ai_flow.model.action import TaskAction
from ai_flow.model.condition import Condition
from ai_flow.model.context import Context
from ai_flow.model.state import ValueState, ValueStateDescriptor
from ai_flow.model.workflow import Workflow
from ai_flow.notification.notification_client import AIFlowNotificationClient
from ai_flow.operators.bash import BashOperator
from ai_flow.operators.python import PythonOperator
class NumCondition(Condition):
def is_met(self, event: Event, context: Context) -> bool:
state: ValueState = context.get_state(ValueStateDescriptor(name='num_state'))
num = 0 if state.value() is None else int(state.value())
num += int(event.value)
if num >= 100:
return True
else:
state.update(num)
return False
def random_produce():
notification_client = \
AIFlowNotificationClient(server_uri='localhost:50052')
while True:
num = random.randint(0, 9)
notification_client.send_event(key='num_event', value=str(num))
time.sleep(1)
with Workflow(name='condition_workflow') as workflow:
task1 = PythonOperator(name='producer',
python_callable=random_produce)
task2 = BashOperator(name='consumer',
bash_command='echo Got 100 records.')
task2.action_on_condition(action=TaskAction.START,
condition=NumCondition(expect_event_keys=['num_event']))