Flows
Flows are the high-level containers that encapsulate and orchestrate AI-powered workflows in ControlFlow. They provide a structured and organized way to manage tasks, agents, tools, and context, enabling developers to build complex and dynamic applications with ease.
Creating Flows
Flows can be created using the Flow
class or the @flow
decorator.
The @flow
Decorator
The @flow
decorator provides a convenient way to define a flow using a Python function.
from controlflow import flow
@flow
def data_processing_flow():
data = load_data()
cleaned_data = clean_data(data)
insights = analyze_data(cleaned_data)
return insights
When using the @flow
decorator, the decorated function becomes the entry point for the flow. The function can contain tasks, which are automatically executed when the flow is run. The @flow
decorator also allows you to specify flow-level properties such as agents, tools, and context.
The Flow
Class
The Flow
class allows you to explicitly define a flow and its properties.
from controlflow import Flow
flow = Flow(
name="Data Processing Flow",
description="A flow to process and analyze data",
agents=[data_analyst, business_analyst],
tools=[data_loader, data_cleaner],
)
By creating a Flow
instance, you can specify the name, description, agents, tools, and other properties of the flow. This approach provides full control over the flow definition and is particularly useful when you need to customize the flow’s behavior or configure advanced settings.
Adding tasks to a flow
Tasks can be added to a flow object in two ways: by calling Flow.add_task(task)
or by creating the tasks inside the Flow
context manager.
Which Approach Should I Use?
tldr: Prefer the @flow
decorator for simplicity and conciseness. Use the Flow
class only for advanced customization and configuration.
Both the Flow
class and the @flow
decorator offer ways to compartmentalize and structure your workflow.
In general, users should prefer the @flow
decorator for its simplicity and ease of use. The @flow
decorator allows you to define a flow using a single function, making it easy to read and understand. It automatically registers tasks within the flow and handles the execution of tasks when the flow is run.
The Flow
class is only recommended for use when a group of tasks need to be quickly placed into a separate flow to isolate their work from the main flow. This circumstance is rare and can also be achieved by using the @flow
decorator with a separate function.
Flow Histories
Each flow maintains a shared history of all agent messages, actions, and tasks that were executed within it. This history is accessible to all agents and tasks within the flow, allowing them to share information and context. The shared history enables agents to collaborate, coordinate, and communicate effectively, leading to more intelligent and adaptive behavior. Every flow has a thread_id
that uniquely identifies its history, allowing agents to distinguish between different flows and maintain separate histories.
Private Histories
In general, every action an agent takes — including sending messages, using tools, and getting tool results — is recorded in the flow’s history as a series of agent messages. Sometimes, you may want to let agents work in isolation without making their activity visible to other agents. For example, if an agent is summarizing a large document, then it will have the entire text of the document somewhere in its own history; there’s no need to share that with other agents.
Because each flow creates a new thread, the simplest way to create a private history is to create a new flow. To facilitate this, flows automatically inherit a copy of their parent flow’s history.
import controlflow as cf
@cf.flow
def study_documents():
# ... other tasks
# creating a nested flow will also create a private history
# for the summary task; the full document text will not be
# visible to agents in the main `study_documents` flow
with cf.Flow() as document_flow:
summary = cf.Task('summarize the document', tools=[load_document]).run()
cf.Task('analyze the summary', context=dict(summary=summary))
# ... other tasks
Flow Properties
Flows have several key properties that define their behavior and configuration.
Name and Description
The name
and description
properties allow you to provide a human-readable name and a brief description of the flow. These properties help in identifying and understanding the purpose of the flow.
flow = Flow(
name="Data Processing Flow",
description="A flow to process and analyze data",
)
Agents and Tools
The agents
and tools
properties allow you to specify AI agents and tools that are available to tasks throughout the flow.
Flow-level agents are used by tasks unless the tasks have their own agents assigned. Flow-level tools are used by tasks in addition to any tools they have defined.
flow = Flow(
agents=[data_analyst, business_analyst],
tools=[data_loader, data_cleaner],
)
Context
The context
property allows you to define a shared context that is accessible to all tasks and agents within the flow. The context can contain any relevant information or data that is required throughout the flow.
flow = Flow(
context={
"data_source": "path/to/data.csv",
"target_audience": "marketing_team",
}
)
The context can be accessed and modified by tasks and agents during the flow execution, enabling dynamic and adaptive behavior based on the flow’s state.
Running Flows
To a run a @flow
decorated function, simply call the function with appropriate arguments. The arguments are automatically added to the flow’s context, making them visible to all tasks even if they aren’t passed directly to that task’s context. Any tasks returned from the flow are automatically resolved into their result
values.
To run a Flow
instance, use its run()
method, which executes all of the tasks that were defined within the flow. You can then access the results of individual tasks by referencing their result
attribute, or by calling them (if they are @task
-decorated functions).
What happens when a flow is run?
When a flow is run, the decorated function is executed and any tasks created within the function are registered with the flow. The flow then orchestrates the execution of the tasks, resolving dependencies, and managing the flow of data between tasks. If the flow function returns a task, or a nested collection of tasks, the flow will automatically replace them with their final results.
Controlling Execution
ControlFlow provides many mechanisms for determining how tasks are executed within a flow. So far, we’ve only looked at flows composed entirely of dependent tasks. These tasks form a DAG which is automatically executed when the flow runs.
Control Flow
Because a flow function is a regular Python function, you can use standard Python control flow to determine when tasks are executed and in what order. At any point, you can manually run()
any task in order to work with its result. Running a task inside a flow will also run any tasks it depends on.
In this flow, we flip a coin to determine which poem to write. The coin toss task is run manually, and the result is used to determine which poem task to return, using a standard Python if
statement:
@flow
def conditional_flow():
coin_toss_task = Task('Flip a coin', result_type=['heads', 'tails'])
# manually run the coin-toss task
outcome = coin_toss_task.run()
# generate a different task based on the outcome of the toss
if outcome == 'heads':
poem = Task('Write a poem about Mt. Rushmore', result_type=str)
elif outcome == 'tails':
poem = Task('Write a poem about the Grand Canyon', result_type=str)
# return the poem task
return poem
print(conditional_flow())
# Upon granite heights, 'neath skies of blue,
# Mount Rushmore stands, a sight to view.
# ...