Flows are the high-level containers that encapsulate and orchestrate AI-powered workflows in ControlFlow. They provide a structured and organized way to manage tasks, agents, tools, and context, enabling developers to build complex and dynamic applications with ease.

Creating Flows

Flows can be created using the Flow class or the @flow decorator.

The @flow Decorator

The @flow decorator provides a convenient way to define a flow using a Python function.

from controlflow import flow

@flow
def data_processing_flow():
    data = load_data()
    cleaned_data = clean_data(data)
    insights = analyze_data(cleaned_data)
    return insights

When using the @flow decorator, the decorated function becomes the entry point for the flow. The function can contain tasks, which are automatically executed when the flow is run. The @flow decorator also allows you to specify flow-level properties such as agents, tools, and context.

The Flow Class

The Flow class allows you to explicitly define a flow and its properties.

from controlflow import Flow

flow = Flow(
    name="Data Processing Flow",
    description="A flow to process and analyze data",
    agents=[data_analyst, business_analyst],
    tools=[data_loader, data_cleaner],
)

By creating a Flow instance, you can specify the name, description, agents, tools, and other properties of the flow. This approach provides full control over the flow definition and is particularly useful when you need to customize the flow’s behavior or configure advanced settings.

Adding tasks to a flow

Tasks can be added to a flow object in two ways: by calling Flow.add_task(task) or by creating the tasks inside the Flow context manager.

from controlflow import Flow, Task

with Flow() as flow:
    task = Task('Load data')

Which Approach Should I Use?

tldr: Prefer the @flow decorator for simplicity and conciseness. Use the Flow class only for advanced customization and configuration.

Both the Flow class and the @flow decorator offer ways to compartmentalize and structure your workflow.

In general, users should prefer the @flow decorator for its simplicity and ease of use. The @flow decorator allows you to define a flow using a single function, making it easy to read and understand. It automatically registers tasks within the flow and handles the execution of tasks when the flow is run.

The Flow class is only recommended for use when a group of tasks need to be quickly placed into a separate flow to isolate their work from the main flow. This circumstance is rare and can also be achieved by using the @flow decorator with a separate function.

Flow Histories

Each flow maintains a shared history of all agent messages, actions, and tasks that were executed within it. This history is accessible to all agents and tasks within the flow, allowing them to share information and context. The shared history enables agents to collaborate, coordinate, and communicate effectively, leading to more intelligent and adaptive behavior. Every flow has a thread_id that uniquely identifies its history, allowing agents to distinguish between different flows and maintain separate histories.

Private Histories

In general, every action an agent takes — including sending messages, using tools, and getting tool results — is recorded in the flow’s history as a series of agent messages. Sometimes, you may want to let agents work in isolation without making their activity visible to other agents. For example, if an agent is summarizing a large document, then it will have the entire text of the document somewhere in its own history; there’s no need to share that with other agents.

Because each flow creates a new thread, the simplest way to create a private history is to create a new flow. To facilitate this, flows automatically inherit a copy of their parent flow’s history.

import controlflow as cf

@cf.flow
def study_documents():

    # ... other tasks

    # creating a nested flow will also create a private history 
    # for the summary task; the full document text will not be 
    # visible to agents in the main `study_documents` flow
    with cf.Flow() as document_flow:
        summary = cf.Task('summarize the document', tools=[load_document]).run()

    cf.Task('analyze the summary', context=dict(summary=summary))

    # ... other tasks

Flow Properties

Flows have several key properties that define their behavior and configuration.

Name and Description

The name and description properties allow you to provide a human-readable name and a brief description of the flow. These properties help in identifying and understanding the purpose of the flow.

flow = Flow(
    name="Data Processing Flow",
    description="A flow to process and analyze data",
)

Agents and Tools

The agents and tools properties allow you to specify AI agents and tools that are available to tasks throughout the flow.

Flow-level agents are used by tasks unless the tasks have their own agents assigned. Flow-level tools are used by tasks in addition to any tools they have defined.

flow = Flow(
    agents=[data_analyst, business_analyst],
    tools=[data_loader, data_cleaner],
)

Context

The context property allows you to define a shared context that is accessible to all tasks and agents within the flow. The context can contain any relevant information or data that is required throughout the flow.

flow = Flow(
    context={
        "data_source": "path/to/data.csv",
        "target_audience": "marketing_team",
    }
)

The context can be accessed and modified by tasks and agents during the flow execution, enabling dynamic and adaptive behavior based on the flow’s state.

Running Flows

To a run a @flow decorated function, simply call the function with appropriate arguments. The arguments are automatically added to the flow’s context, making them visible to all tasks even if they aren’t passed directly to that task’s context. Any tasks returned from the flow are automatically resolved into their result values.

To run a Flow instance, use its run() method, which executes all of the tasks that were defined within the flow. You can then access the results of individual tasks by referencing their result attribute, or by calling them (if they are @task-decorated functions).

@flow
def item_flow():
    price = Task('generate a price between 1 and 1000', result_type=int)
    item = Task(
        'Come up with an common item that has the provided price', 
        result_type=str, 
        context=dict(price=price)
    )
    return item

# call the flow; the result is automatically resolved 
# as the result of the `item` task.
item = item_flow()

What happens when a flow is run?

When a flow is run, the decorated function is executed and any tasks created within the function are registered with the flow. The flow then orchestrates the execution of the tasks, resolving dependencies, and managing the flow of data between tasks. If the flow function returns a task, or a nested collection of tasks, the flow will automatically replace them with their final results.

Controlling Execution

ControlFlow provides many mechanisms for determining how tasks are executed within a flow. So far, we’ve only looked at flows composed entirely of dependent tasks. These tasks form a DAG which is automatically executed when the flow runs.

Control Flow

Because a flow function is a regular Python function, you can use standard Python control flow to determine when tasks are executed and in what order. At any point, you can manually run() any task in order to work with its result. Running a task inside a flow will also run any tasks it depends on.

In this flow, we flip a coin to determine which poem to write. The coin toss task is run manually, and the result is used to determine which poem task to return, using a standard Python if statement:

@flow
def conditional_flow():
    coin_toss_task = Task('Flip a coin', result_type=['heads', 'tails'])
    # manually run the coin-toss task
    outcome = coin_toss_task.run()

    # generate a different task based on the outcome of the toss
    if outcome == 'heads':
        poem = Task('Write a poem about Mt. Rushmore', result_type=str)
    elif outcome == 'tails':
        poem = Task('Write a poem about the Grand Canyon', result_type=str)
    
    # return the poem task
    return poem

print(conditional_flow())
# Upon granite heights, 'neath skies of blue,
# Mount Rushmore stands, a sight to view.
# ...