ai_flow.model package¶
Subpackages¶
Submodules¶
ai_flow.model.action module¶
-
class
ai_flow.model.action.TaskAction(value)[source]¶ Bases:
str,enum.EnumEnumeration of execution commands for scheduled tasks. START: Start a task instance. RESTART: Stop the current task instance and start a new task instance. STOP: Stop a task instance.
-
RESTART= 'RESTART'¶
-
START= 'START'¶
-
STOP= 'STOP'¶
-
ai_flow.model.condition module¶
-
class
ai_flow.model.condition.Condition(expect_event_keys: List[str])[source]¶ Bases:
objectConditions that trigger scheduling.
- Parameters
expect_event_keys – The keys of events that this condition depends on.
-
abstract
is_met(event: notification_service.model.event.Event, context: ai_flow.model.context.Context) → bool[source]¶ Determine whether the condition is met. :param event: The currently processed event. :param context: The context in which the condition is executed. :return True:The condition is met. False: The condition is not met.
ai_flow.model.context module¶
-
class
ai_flow.model.context.Context[source]¶ Bases:
objectThe context in which custom logic is executed.
-
get_state(state_descriptor: ai_flow.model.state.StateDescriptor) → ai_flow.model.state.State[source]¶ Get the State object. :param state_descriptor: Description of the State object. :return The State object.
-
get_task_status(task_name) → ai_flow.model.status.TaskStatus[source]¶ Get the task status by task name. :param task_name: The name of the task.
-
ai_flow.model.execution_type module¶
-
class
ai_flow.model.execution_type.ExecutionType(value)[source]¶ Bases:
str,enum.EnumEnumeration of execution of workflow and task. MANUAL: Manually trigger execution. EVENT: Event triggered execution. PERIODIC: Periodic triggered execution.
-
EVENT= 'EVENT'¶
-
MANUAL= 'MANUAL'¶
-
PERIODIC= 'PERIODIC'¶
-
ai_flow.model.operator module¶
-
class
ai_flow.model.operator.AIFlowOperator(task_name: str, **kwargs)[source]¶ Bases:
ai_flow.model.operator.OperatorAIFlowOperator is a template that defines a task, it defines AIFlow’s native Operator interface. To derive this class, you are expected to override the constructor as well as abstract methods.
- Parameters
name – The operator’s name.
kwargs – Operator’s extended parameters.
-
await_termination(context: ai_flow.model.context.Context, timeout: Optional[int] = None)[source]¶ Wait for a task instance to finish. :param context: The context in which the operator is executed. :param timeout: If timeout is None, wait until the task ends.
If timeout is not None, wait for the task to end or the time exceeds timeout(seconds).
-
get_metrics(context: ai_flow.model.context.Context) → Dict[source]¶ Get the metrics of a task instance.
-
abstract
start(context: ai_flow.model.context.Context)[source]¶ Start a task instance.
-
stop(context: ai_flow.model.context.Context)[source]¶ Stop a task instance.
-
class
ai_flow.model.operator.Operator(name: str, **kwargs)[source]¶ Bases:
objectOperator is a template that defines a task. It is the abstract base class for all operators. Since operators create objects that become tasks in the Workflow.To derive this class, you are expected to override the constructor method. This class is abstract and shouldn’t be instantiated. Instantiating a class derived from this one results in the creation of a task object, which ultimately becomes a task in Workflow objects.
- Parameters
name – The operator’s name.
kwargs – Operator’s extended parameters.
-
action_on_condition(action: ai_flow.model.action.TaskAction, condition: ai_flow.model.condition.Condition)[source]¶ Schedule the task based on a specified condition. :param action: The action for scheduling the task. :param condition: The condition for scheduling the task to depend on.
-
action_on_event_received(action: ai_flow.model.action.TaskAction, event_key: str)[source]¶ When the specified event is received, the task is scheduled. :param action: The action for scheduling the task. :param event_key: The event for scheduling the task to depend on.
-
action_on_task_status(action: ai_flow.model.action.TaskAction, upstream_task_status_dict: Dict[ai_flow.model.operator.Operator, ai_flow.model.status.TaskStatus])[source]¶ Schedule the task based on the status of upstream tasks. :param action: The action for scheduling the task. :param upstream_task_status_dict: The upstream task status for scheduling the task to depend on.
-
start_after(tasks: Union[ai_flow.model.operator.Operator, List[ai_flow.model.operator.Operator]])[source]¶ Start the task after upstream tasks succeed. :param tasks: The upstream tasks.
ai_flow.model.rule module¶
-
class
ai_flow.model.rule.TaskRule(condition: ai_flow.model.condition.Condition, action: ai_flow.model.action.TaskAction)[source]¶ Bases:
objectRules that trigger task scheduling.
- Parameters
condition – Trigger condition of the rule.
action – The execution commands for scheduled tasks.
-
trigger(event: notification_service.model.event.Event, context: ai_flow.model.context.Context) → Optional[ai_flow.model.action.TaskAction][source]¶ Determine whether to trigger task scheduling behavior. :param event: The currently processed event. :param context: The context in which the rule is executed. :return None: Does not trigger task scheduling behavior.
Not None: Execution command for scheduling the task.
-
class
ai_flow.model.rule.WorkflowRule(condition: ai_flow.model.condition.Condition)[source]¶ Bases:
objectRules that trigger workflow scheduling.
- Parameters
condition – Trigger condition of the rule.
-
trigger(event: notification_service.model.event.Event, context: ai_flow.model.context.Context) → bool[source]¶ Determine whether to trigger workflow running. :param event: The currently processed event. :param context: The context in which the rule is executed. :return True:Start a WorkflowExecution. False: Do not start a WorkflowExecution.
ai_flow.model.state module¶
-
class
ai_flow.model.state.StateDescriptor(name)[source]¶ Bases:
objectUser-defined state description
-
class
ai_flow.model.state.ValueState[source]¶ Bases:
ai_flow.model.state.StateSingle-valued user-defined state
-
class
ai_flow.model.state.ValueStateDescriptor(name)[source]¶ Bases:
ai_flow.model.state.StateDescriptorSingle-valued user-defined state description
ai_flow.model.status module¶
-
class
ai_flow.model.status.TaskStatus(value)[source]¶ Bases:
str,enum.EnumEnumeration of TaskExecution’s status. INIT: The initial status of TaskExecution. QUEUED: The TaskExecution has been assigned to an executor. RESTARTING: The TaskExecution was requested to restart when it was running RUNNING: The TaskExecution is running. SUCCESS: The TaskExecution finished running without errors. FAILED: The TaskExecution had errors during execution and failed to run. KILLING: The TaskExecution was externally requested to shut down when it was running. KILLED: The TaskExecution was externally shut down. RETRYING: The TaskExecution failed, but has retry attempts left and will be rescheduled.
-
FAILED= 'FAILED'¶
-
INIT= 'INIT'¶
-
QUEUED= 'QUEUED'¶
-
RETRYING= 'RETRYING'¶
-
RUNNING= 'RUNNING'¶
-
STOPPED= 'STOPPED'¶
-
STOPPING= 'STOPPING'¶
-
SUCCESS= 'SUCCESS'¶
-
-
class
ai_flow.model.status.WorkflowStatus(value)[source]¶ Bases:
str,enum.EnumEnumeration of WorkflowExecution’s status. INIT: The initial status of WorkflowExecution. RUNNING: The WorkflowExecution is running. SUCCESS: The WorkflowExecution finished running without errors. FAILED: The WorkflowExecution had errors during execution and failed to run. STOPPED: The WorkflowExecution has been stopped.
-
FAILED= 'FAILED'¶
-
INIT= 'INIT'¶
-
RUNNING= 'RUNNING'¶
-
STOPPED= 'STOPPED'¶
-
SUCCESS= 'SUCCESS'¶
-
ai_flow.model.task_execution module¶
-
class
ai_flow.model.task_execution.TaskExecution(workflow_execution_id: int, task_name: str, sequence_number: int, execution_type: ai_flow.model.execution_type.ExecutionType, begin_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None, status: ai_flow.model.status.TaskStatus = <TaskStatus.INIT: 'INIT'>, id: Optional[int] = None)[source]¶ Bases:
objectTaskExecution describes an instance of a task. It can be created by the scheduler.
- Parameters
workflow_execution_id – TaskExecution belongs to the unique identifier of WorkflowExecution.
task_name – The name of the task it belongs to.
sequence_number – A task in a WorkflowExecution can be run multiple times, it indicates how many times this task is run.
execution_type – The type that triggers TaskExecution.
begin_date – The time TaskExecution started executing.
end_date – The time TaskExecution ends execution.
status – TaskExecution’s current status.
id – Unique ID of TaskExecution.
ai_flow.model.workflow module¶
-
class
ai_flow.model.workflow.Workflow(name: str, namespace: str = 'default', **kwargs)[source]¶ Bases:
objectWorkflow is a collection of tasks and trigger rules. A Workflow can be scheduled by events, manual or schedule. For each execution, the workflow needs to run its individual tasks when their triggering rules are met. Workflows essentially act as namespaces for tasks. A task_id can only be added once to a Workflow.
- Parameters
name – The name of the workflow.
-
action_on_condition(task_name: str, action: ai_flow.model.action.TaskAction, condition: ai_flow.model.condition.Condition)[source]¶
-
action_on_event_received(task_name: str, event_key: str, action: ai_flow.model.action.TaskAction)[source]¶
-
action_on_task_status(task_name: str, action: ai_flow.model.action.TaskAction, upstream_task_status_dict: Dict[ai_flow.model.operator.Operator, ai_flow.model.status.TaskStatus])[source]¶
-
class
ai_flow.model.workflow.WorkflowContextManager[source]¶ Bases:
objectWorkflow context manager is used to keep the current Workflow when Workflow is used as ContextManager. You can use Workflow as context: .. code-block:: python
- with Workflow(
name = ‘workflow’
- ) as workflow:
…
If you do this the context stores the Workflow and whenever new task is created, it will use such Workflow as the parent Workflow.
-
classmethod
get_current_workflow() → Optional[ai_flow.model.workflow.Workflow][source]¶
-
classmethod
pop_context_managed_workflow() → Optional[ai_flow.model.workflow.Workflow][source]¶
-
classmethod
push_context_managed_workflow(workflow: ai_flow.model.workflow.Workflow)[source]¶
ai_flow.model.workflow_execution module¶
-
class
ai_flow.model.workflow_execution.WorkflowExecution(workflow_id, execution_type: ai_flow.model.execution_type.ExecutionType, begin_date: datetime.datetime, end_date: datetime.datetime, status: ai_flow.model.status.WorkflowStatus, id: Optional[int] = None)[source]¶ Bases:
objectWorkflowExecution describes an instance of a Workflow. It can be created by the scheduler.
- Parameters
workflow_id – WorkflowExecution belongs to the unique identifier of Workflow.
execution_type – The type that triggers WorkflowExecution.
begin_date – The time WorkflowExecution started executing.
end_date – The time WorkflowExecution ends execution.
status – WorkflowExecution’s current status.
id – Unique ID of WorkflowExecution.