Running AIFlow in Docker¶
This section will show you how to start AIFlow in docker container if you are tired of managing the python environment and dependencies.
Pulling Docker Image¶
Run following command to pull latest AIFlow docker image.
docker pull flinkaiflow/flink-ai-flow-dev:latest
Running Docker Container¶
Run following command to enter the docker container in interactive mode.
docker run -it flinkaiflow/flink-ai-flow-dev:latest /bin/bash
Starting AIFlow¶
Starting Notification Server¶
AIFlow depends on notification service as an event dispatcher. Before running AIFlow, you need to start notification server.
# Initialize configuration
notification config init
# Initialize database and tables
notification db init
# Start notification server as a daemon
notification server start -d &
Starting AIFlow Server¶
# Initialize configuration
aiflow config init
# Initialize database and tables
aiflow db init
# Start AIFlow server as a daemon
aiflow server start -d &
Note
You may run into issues caused by different operating systems or versions, please refer to Troubleshooting section to get solutions.
Running a Workflow¶
Defining a Workflow¶
Below is a typically event-driven workflow. The workflow contains 4 tasks, task3 is started once both task1 and task2 finished, then task3 will send a custom event which would trigger task4 to start running.
import time
from ai_flow.model.action import TaskAction
from ai_flow.notification.notification_client import AIFlowNotificationClient
from ai_flow.operators.bash import BashOperator
from ai_flow.operators.python import PythonOperator
from ai_flow.model.workflow import Workflow
EVENT_KEY = "key"
def func():
time.sleep(5)
notification_client = AIFlowNotificationClient("localhost:50052")
print(f"Sending event with key: {EVENT_KEY}")
notification_client.send_event(key=EVENT_KEY,
value='This is a custom message.')
with Workflow(name='quickstart_workflow') as w1:
task1 = BashOperator(name='task1', bash_command='echo I am the 1st task.')
task2 = BashOperator(name='task2', bash_command='echo I am the 2nd task.')
task3 = PythonOperator(name='task3', python_callable=func)
task4 = BashOperator(name='task4', bash_command='echo I am the 4th task.')
task3.start_after([task1, task2])
task4.action_on_event_received(action=TaskAction.START, event_key=EVENT_KEY)
You can save the above workflow as a python file on your workstation and remember the file path as ${path_of_the_workflow_file}.
Uploading the Workflow¶
Now you can upload the workflow with the path of the file you just saved.
aiflow workflow upload ${path_of_the_workflow_file}
You can view the workflow you uploaded by the following command:
aiflow workflow list --namespace default
Starting an Execution¶
The workflow you uploaded can be executed as an instance which is called execution. You can start a new execution by the following command:
aiflow workflow-execution start quickstart_workflow --namespace default
Viewing the Results¶
You can view the workflow execution you just started by the following command:
aiflow workflow-execution list quickstart_workflow --namespace default
The result shows id, status and other information of the workflow execution. If it is the first time you execute a workflow, the id of the workflow execution should be 1,
so you can then list tasks of workflow execution with id 1 by the following command:
aiflow task-execution list 1
Also you can check the log under ${AIFLOW_HOME}/logs to view the outputs of tasks.