ai_flow.model package

Subpackages

Submodules

ai_flow.model.action module

class ai_flow.model.action.TaskAction(value)[source]

Bases: str, enum.Enum

Enumeration of execution commands for scheduled tasks. START: Start a task instance. RESTART: Stop the current task instance and start a new task instance. STOP: Stop a task instance.

RESTART = 'RESTART'
START = 'START'
STOP = 'STOP'

ai_flow.model.condition module

class ai_flow.model.condition.Condition(expect_event_keys: List[str])[source]

Bases: object

Conditions that trigger scheduling.

Parameters

expect_event_keys – The keys of events that this condition depends on.

abstract is_met(event: notification_service.model.event.Event, context: ai_flow.model.context.Context)bool[source]

Determine whether the condition is met. :param event: The currently processed event. :param context: The context in which the condition is executed. :return True:The condition is met. False: The condition is not met.

ai_flow.model.context module

class ai_flow.model.context.Context[source]

Bases: object

The context in which custom logic is executed.

get_state(state_descriptor: ai_flow.model.state.StateDescriptor)ai_flow.model.state.State[source]

Get the State object. :param state_descriptor: Description of the State object. :return The State object.

get_task_status(task_name)ai_flow.model.status.TaskStatus[source]

Get the task status by task name. :param task_name: The name of the task.

ai_flow.model.execution_type module

class ai_flow.model.execution_type.ExecutionType(value)[source]

Bases: str, enum.Enum

Enumeration of execution of workflow and task. MANUAL: Manually trigger execution. EVENT: Event triggered execution. PERIODIC: Periodic triggered execution.

EVENT = 'EVENT'
MANUAL = 'MANUAL'
PERIODIC = 'PERIODIC'

ai_flow.model.operator module

class ai_flow.model.operator.AIFlowOperator(task_name: str, **kwargs)[source]

Bases: ai_flow.model.operator.Operator

AIFlowOperator is a template that defines a task, it defines AIFlow’s native Operator interface. To derive this class, you are expected to override the constructor as well as abstract methods.

Parameters
  • name – The operator’s name.

  • kwargs – Operator’s extended parameters.

await_termination(context: ai_flow.model.context.Context, timeout: Optional[int] = None)[source]

Wait for a task instance to finish. :param context: The context in which the operator is executed. :param timeout: If timeout is None, wait until the task ends.

If timeout is not None, wait for the task to end or the time exceeds timeout(seconds).

get_metrics(context: ai_flow.model.context.Context)Dict[source]

Get the metrics of a task instance.

abstract start(context: ai_flow.model.context.Context)[source]

Start a task instance.

stop(context: ai_flow.model.context.Context)[source]

Stop a task instance.

class ai_flow.model.operator.Operator(name: str, **kwargs)[source]

Bases: object

Operator is a template that defines a task. It is the abstract base class for all operators. Since operators create objects that become tasks in the Workflow.To derive this class, you are expected to override the constructor method. This class is abstract and shouldn’t be instantiated. Instantiating a class derived from this one results in the creation of a task object, which ultimately becomes a task in Workflow objects.

Parameters
  • name – The operator’s name.

  • kwargs – Operator’s extended parameters.

action_on_condition(action: ai_flow.model.action.TaskAction, condition: ai_flow.model.condition.Condition)[source]

Schedule the task based on a specified condition. :param action: The action for scheduling the task. :param condition: The condition for scheduling the task to depend on.

action_on_event_received(action: ai_flow.model.action.TaskAction, event_key: str)[source]

When the specified event is received, the task is scheduled. :param action: The action for scheduling the task. :param event_key: The event for scheduling the task to depend on.

action_on_task_status(action: ai_flow.model.action.TaskAction, upstream_task_status_dict: Dict[ai_flow.model.operator.Operator, ai_flow.model.status.TaskStatus])[source]

Schedule the task based on the status of upstream tasks. :param action: The action for scheduling the task. :param upstream_task_status_dict: The upstream task status for scheduling the task to depend on.

start_after(tasks: Union[ai_flow.model.operator.Operator, List[ai_flow.model.operator.Operator]])[source]

Start the task after upstream tasks succeed. :param tasks: The upstream tasks.

class ai_flow.model.operator.OperatorConfigItem[source]

Bases: object

The Operator’s config items. PERIODIC_EXPRESSION: The expression for the periodic task.

PERIODIC_EXPRESSION = 'periodic_expression'

ai_flow.model.rule module

class ai_flow.model.rule.TaskRule(condition: ai_flow.model.condition.Condition, action: ai_flow.model.action.TaskAction)[source]

Bases: object

Rules that trigger task scheduling.

Parameters
  • condition – Trigger condition of the rule.

  • action – The execution commands for scheduled tasks.

trigger(event: notification_service.model.event.Event, context: ai_flow.model.context.Context)Optional[ai_flow.model.action.TaskAction][source]

Determine whether to trigger task scheduling behavior. :param event: The currently processed event. :param context: The context in which the rule is executed. :return None: Does not trigger task scheduling behavior.

Not None: Execution command for scheduling the task.

class ai_flow.model.rule.WorkflowRule(condition: ai_flow.model.condition.Condition)[source]

Bases: object

Rules that trigger workflow scheduling.

Parameters

condition – Trigger condition of the rule.

trigger(event: notification_service.model.event.Event, context: ai_flow.model.context.Context)bool[source]

Determine whether to trigger workflow running. :param event: The currently processed event. :param context: The context in which the rule is executed. :return True:Start a WorkflowExecution. False: Do not start a WorkflowExecution.

ai_flow.model.state module

class ai_flow.model.state.State[source]

Bases: object

User-defined state

clear()[source]

Clean up user-defined state

class ai_flow.model.state.StateDescriptor(name)[source]

Bases: object

User-defined state description

class ai_flow.model.state.StateType[source]

Bases: object

VALUE = 'value'
class ai_flow.model.state.ValueState[source]

Bases: ai_flow.model.state.State

Single-valued user-defined state

update(state)[source]

Update the single-valued user-defined state’s value

value()object[source]

Get the single-valued user-defined state’s value

class ai_flow.model.state.ValueStateDescriptor(name)[source]

Bases: ai_flow.model.state.StateDescriptor

Single-valued user-defined state description

ai_flow.model.status module

class ai_flow.model.status.TaskStatus(value)[source]

Bases: str, enum.Enum

Enumeration of TaskExecution’s status. INIT: The initial status of TaskExecution. QUEUED: The TaskExecution has been assigned to an executor. RESTARTING: The TaskExecution was requested to restart when it was running RUNNING: The TaskExecution is running. SUCCESS: The TaskExecution finished running without errors. FAILED: The TaskExecution had errors during execution and failed to run. KILLING: The TaskExecution was externally requested to shut down when it was running. KILLED: The TaskExecution was externally shut down. RETRYING: The TaskExecution failed, but has retry attempts left and will be rescheduled.

FAILED = 'FAILED'
INIT = 'INIT'
QUEUED = 'QUEUED'
RETRYING = 'RETRYING'
RUNNING = 'RUNNING'
STOPPED = 'STOPPED'
STOPPING = 'STOPPING'
SUCCESS = 'SUCCESS'
class ai_flow.model.status.WorkflowStatus(value)[source]

Bases: str, enum.Enum

Enumeration of WorkflowExecution’s status. INIT: The initial status of WorkflowExecution. RUNNING: The WorkflowExecution is running. SUCCESS: The WorkflowExecution finished running without errors. FAILED: The WorkflowExecution had errors during execution and failed to run. STOPPED: The WorkflowExecution has been stopped.

FAILED = 'FAILED'
INIT = 'INIT'
RUNNING = 'RUNNING'
STOPPED = 'STOPPED'
SUCCESS = 'SUCCESS'

ai_flow.model.task_execution module

class ai_flow.model.task_execution.TaskExecution(workflow_execution_id: int, task_name: str, sequence_number: int, execution_type: ai_flow.model.execution_type.ExecutionType, begin_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None, status: ai_flow.model.status.TaskStatus = <TaskStatus.INIT: 'INIT'>, id: Optional[int] = None)[source]

Bases: object

TaskExecution describes an instance of a task. It can be created by the scheduler.

Parameters
  • workflow_execution_id – TaskExecution belongs to the unique identifier of WorkflowExecution.

  • task_name – The name of the task it belongs to.

  • sequence_number – A task in a WorkflowExecution can be run multiple times, it indicates how many times this task is run.

  • execution_type – The type that triggers TaskExecution.

  • begin_date – The time TaskExecution started executing.

  • end_date – The time TaskExecution ends execution.

  • status – TaskExecution’s current status.

  • id – Unique ID of TaskExecution.

class ai_flow.model.task_execution.TaskExecutionKey(workflow_execution_id, task_name, seq_num)[source]

Bases: object

ai_flow.model.workflow module

class ai_flow.model.workflow.Workflow(name: str, namespace: str = 'default', **kwargs)[source]

Bases: object

Workflow is a collection of tasks and trigger rules. A Workflow can be scheduled by events, manual or schedule. For each execution, the workflow needs to run its individual tasks when their triggering rules are met. Workflows essentially act as namespaces for tasks. A task_id can only be added once to a Workflow.

Parameters

name – The name of the workflow.

action_on_condition(task_name: str, action: ai_flow.model.action.TaskAction, condition: ai_flow.model.condition.Condition)[source]
action_on_event_received(task_name: str, event_key: str, action: ai_flow.model.action.TaskAction)[source]
action_on_task_status(task_name: str, action: ai_flow.model.action.TaskAction, upstream_task_status_dict: Dict[ai_flow.model.operator.Operator, ai_flow.model.status.TaskStatus])[source]
class ai_flow.model.workflow.WorkflowContextManager[source]

Bases: object

Workflow context manager is used to keep the current Workflow when Workflow is used as ContextManager. You can use Workflow as context: .. code-block:: python

with Workflow(

name = ‘workflow’

) as workflow:

If you do this the context stores the Workflow and whenever new task is created, it will use such Workflow as the parent Workflow.

classmethod get_current_workflow()Optional[ai_flow.model.workflow.Workflow][source]
classmethod pop_context_managed_workflow()Optional[ai_flow.model.workflow.Workflow][source]
classmethod push_context_managed_workflow(workflow: ai_flow.model.workflow.Workflow)[source]

ai_flow.model.workflow_execution module

class ai_flow.model.workflow_execution.WorkflowExecution(workflow_id, execution_type: ai_flow.model.execution_type.ExecutionType, begin_date: datetime.datetime, end_date: datetime.datetime, status: ai_flow.model.status.WorkflowStatus, id: Optional[int] = None)[source]

Bases: object

WorkflowExecution describes an instance of a Workflow. It can be created by the scheduler.

Parameters
  • workflow_id – WorkflowExecution belongs to the unique identifier of Workflow.

  • execution_type – The type that triggers WorkflowExecution.

  • begin_date – The time WorkflowExecution started executing.

  • end_date – The time WorkflowExecution ends execution.

  • status – WorkflowExecution’s current status.

  • id – Unique ID of WorkflowExecution.