Tasks represent a unit of work that needs to be completed by your agents. To execute that work and retrieve its result, you need to instruct your agents to run the task.

Creating and running a task

The most convenient way to run a task is to create and run it in a single call with the run function. The arguments for run are identical to the Task constructor, including an objective, agent assignments, and more. By default, the task will be run to completion and its result will be returned, or an error will be raised if the task fails.

This approach is so common that you’ll see it used throughout the ControlFlow documentation.

@task-decorated functions

The @task decorator creates a task from a function. This approach is less common than using the run function, but can be useful for quickly “packaging” task definitions as reusable logic. To run the task at any time, simply call the function with appropriate arguments. This will create a new task and run it to completion, returning the result.

Running existing tasks

In many cases, you’ll create one or more tasks that you want to run as a group, rather than creating and running each one individually. To run a list of tasks as a group, use the run_tasks function. All of the tasks will be run as part of the same flow (if you are not already operating in a flow context), so they share context and history throughout execution.

Here is an example of running a list of tasks:

Running a single task

Individual tasks have a convenient run method that executes the task and returns its result (or raises an error if the task fails):

Streaming

New in version 0.11

In addition to running tasks to completion, ControlFlow supports streaming events during task execution. This allows you to process or display intermediate outputs like agent messages, tool calls, and results in real-time.

To enable streaming, set stream=True when running tasks:

import controlflow as cf

# Stream all events
for event, snapshot, delta in cf.run("Write a poem", stream=True, handlers=[]):
    print(f"Event type: {event.event}")
    
    if event.event == "agent-content":
        print(f"Agent said: {snapshot}")
    elif event.event == "agent-tool-call":  
        print(f"Tool called: {snapshot}")

You can also filter which events you want to receive using the Stream enum:

import controlflow as cf

# Only stream content events
for event, content, delta in cf.run(
    "Write a poem",
    stream=cf.Stream.CONTENT,
    handlers=[], # remove the default print handler
):
    if delta:
        # Print incremental content updates
        print(delta, end="", flush=True) 
    else:
        # Print complete messages
        print(content)

For more details on working with streaming events, including programmatic event handlers, see the Streaming guide.

Multi-Agent Collaboration

For tasks involving multiple agents, ControlFlow needs a way to manage their collaboration. What makes this more complicated than simply making an LLM call and moving on to the next agent is that it may take multiple LLM calls to complete a single agentic “turn” of work.

It’s tempting to say that a single LLM call is equivalent to a single agentic turn. However, this approach breaks down quickly. If an agent uses a tool (one LLM call), it should almost always be invoked a second time to examine the tool result. If the system moved on after every LLM call, then the result could potentially be evaluated by an LLM that wasn’t designed to interpret the tool’s output. In addition, naively ending a turn after a tool call would prevent “thinking out loud” and other emergent, iterative behaviors.

Therefore, ControlFlow differentiates between LLM calls and agent turns.

  • Calls: each time an LLM model is invoked
  • Turns: each time an agent is selected by the orchestrator. A turn may consist of multiple LLM calls.

Since the number of calls per turn can vary, we need a way to determine when an agent’s turn is over, and how to select the next agent to act. These are referred to as turn strategies. Understanding and choosing the right turn strategy for your use case can significantly impact the efficiency and effectiveness of your multi-agent workflows.

This table describes the different turn strategies available in ControlFlow. The default strategy is Popcorn, which is a good, general-purpose strategy in which each agent ends its turn by picking the agent that should go next.

TurnStrategyDescriptionIdeal when…Keep in mind…
PopcornEach agent takes a turn, then picks the next agent to go next.All agents are generally capable of making decisions and have visibility into all tasks.Requires one extra tool call per turn, to pick the next agent.
ModeratedA moderator agent always decides which agent should act next.You want a dedicated agent to orchestrate the others, who may not be powerful enough to make decisions themselves.Requires up to two extra tool calls per turn: one for the agent to end its turn (which could happen in parallel with other work if your LLM supports it) and another for the moderator to pick the next agent.
RoundRobinAgents take turns in a round-robin fashion.You want agents to work in a specific sequence.May be less efficient than other strategies, especially if agents have varying workloads.
MostBusyThe agent with the most active tasks goes next.You want to prioritize agents who have the most work to do.May lead to task starvation for less busy agents.
RandomInvokes a random agent.You want to distribute the load evenly across agents.Can be inefficient; may select agents without relevant tasks.
SingleAgentOnly one agent is ever invoked.You want to control the sequence of agents yourself.Requires manual management; may not adapt well to dynamic scenarios.

Example: Round Robin

To use a turn strategy, provide it as an argument to the run() call. Here, we use a round robin strategy to ensure that each agent gets a turn in order:

Round Robin
import controlflow as cf

# create three agents
agent1 = cf.Agent(name="Agent 1")
agent2 = cf.Agent(name="Agent 2")
agent3 = cf.Agent(name="Agent 3")

cf.run(
    "Say hello to each other",
    instructions=(
        "Mark the task successful only when every "
        "agent has posted a message to the thread."
    ),
    agents=[agent1, agent2, agent3],
    # supply a turn strategy
    turn_strategy=cf.orchestration.turn_strategies.RoundRobin(),
)

Example: Moderated

We can also use the Moderated strategy to have a more powerful model orchestrate some smaller ones. In this example, we invite an “optimist” and “pessimist”, both powered by gpt-4o-mini, to debate the meaning of life. A “moderator” is tasked with picking the next agent to speak. Note that the moderator is also the only completion_agent, meaning it’s responsible for marking the task as successful.

Moderated
import controlflow as cf

optimist = cf.Agent(name="Optimist", model="gpt-4o-mini")
pessimist = cf.Agent(name="Pessimist", model="gpt-4o-mini")
moderator = cf.Agent(name="Moderator")

cf.run(
    "Debate the meaning of life",
    instructions='Give each agent at least three chances to speak.',
    agents=[moderator, optimist, pessimist],
    completion_agents=[moderator],
    turn_strategy=cf.orchestration.turn_strategies.Moderated(moderator=moderator),
)

Advanced orchestration

All of the approaches described so far will run a group of tasks until they are marked as complete. However, you may want to exert more control over task execution. To do so, you’ll need to create and work with an Orchestrator directly.

The orchestration loop

When tasks are run, ControlFlow invokes an Orchestrator to coordinate agentic activity and complete the work. The orchestrator is ultimately responsible for creating the core agentic loop. In each iteration, an agent (or more specifically, an LLM) is invoked and all available information — the tasks, the flow, the agents, and the history of the conversation — is used to compile an appropriate prompt.

In the case of a single task and a single agent, this process is very straightforward, because there is no ambiguity about which LLM to invoke on each iteration. However, as the number of tasks and agents grows, the orchestrator loop becomes more complex.

The orchestrator will always consider not only the tasks that were passed to it, but also all of those tasks’s dependencies and relationships as well. There are three types of relationships it considers to build a universe of relevant tasks:

  1. Subtasks: A task can not be completed until all of its subtasks (or child tasks) are completed.
  2. Dependencies: A task can not be completed until all of its upstream dependencies are completed.
  3. Parents: A task can have a parent, meaning that it is a subtask of another task. The orchestrator will consider all ancestors of a task when compiling a prompt, but it will not automatically attempt to run the parent tasks.

Once the set of tasks has been identified, the orchestrator begins the loop by considering tasks that are ready to run: all of their dependencies have been completed. From the subset of ready tasks, an agent is selected using the orchestrator’s turn strategy. The selected agent is invoked to make progress on its assigned tasks, after which the loop repeats.

This process continues until all of the provided tasks have been completed.

Automatic dependency execution

One of the most powerful features of ControlFlow’s orchestration is its automatic execution of dependent tasks. This means that when you run a task, you don’t need to manually manage its dependencies or subtasks; the orchestrator handles this for you.

When a task is run, the orchestrator automatically executes any upstream dependencies before starting the task itself. It also ensures that all subtasks are completed before marking a parent task as complete. Parent tasks are considered for context, though the orchestrator won’t attempt to complete them unless specifically instructed to do so.

This automatic execution allows you to create complex task hierarchies without worrying about the order of execution. You can focus on defining the relationships between tasks, and let ControlFlow handle the intricacies of execution order.

Note that you could also run the name task eagerly (name_task.run()) and then pass its result to the poem task. The best way to structure your workflow will depend on your specific use case and preferences.

Managing the agentic loop

You can control the agentic loop by calling it iteratively with limits on the amount of work that can be done on each call. In this example, we manually invoke the author and critic agents for one turn each until the task is complete. Note that this example is contrived; if you actually wanted to loop over agents deterministically, the RoundRobin turn strategy is a better choice. However, “opening” the loop like this is a good choice when you want to dynamically select the next agent based on the results of the previous turn, or you want to run some custom logic between turns.

import controlflow as cf

author = cf.Agent(name="Author")
critic = cf.Agent(name="Critic")

task = cf.Task('Write a poem about AI, then critique it.', agents=[author, critic])

while task.is_incomplete():
    # run the author for one turn
    cf.run_tasks([task], agent=author, max_agent_turns=1)

    # run the critic for one turn
    cf.run_tasks([task], agent=critic, max_agent_turns=1)


orchestrator.run()

Limiting agent turns

The max_agent_turns argument limits the number of agentic turns that can be taken in a single orchestration session. This limit is enforced by the orchestrator, which will end the turn early if the limit is reached.

A global default can be set with ControlFlow’s orchestrator_max_agent_turns setting.

Limiting LLM calls

The max_llm_calls argument limits the number of LLM calls that can be made during a single orchestration session. This limit is enforced by the orchestrator, which will end the turn early if the limit is reached. Note that this is enforced independently of the max_agent_turns limit.

A global default can be set with ControlFlow’s orchestrator_max_llm_calls setting.

Limiting LLM calls over the lifetime of a task

Each task has an optional max_llm_calls parameter, which limits the number of LLM calls that can be made during the task’s lifetime. A task will be marked as failed if the limit is reached and the task is not complete. The call count is incremented any time an LLM is invoked and the task is both “ready” and assigned to the active agent.

Here, we force a task to fail by limiting it to a single LLM call but requiring it to use a tool (which typically requires two LLM calls: one to use the tool and one to evaluate the result):

Note that the setting max_llm_calls on the task results in the task failing if the limit is reached. Setting max_llm_calls on the orchestrator only exits the loop early, but does not otherwise affect task behavior.

Early termination conditions

New in version 0.11

ControlFlow supports more flexible control over when an orchestration run should end through the use of run_until conditions. These conditions allow you to specify complex termination logic based on various factors such as task completion, failure, or custom criteria.

To use a run until condition, you can pass it to the run_until parameter when calling run, run_async, run_tasks, or run_tasks_async. For example, the following tasks will run until either one of them is complete or 10 LLM calls have been made:

import controlflow as cf
from controlflow.orchestration.conditions import AnyComplete, MaxLLMCalls

result = cf.run_tasks(
    tasks=[cf.Task("write a poem about AI"), cf.Task("write a poem about ML")],
    run_until=AnyComplete() | MaxLLMCalls(10)
)

(Note that because tasks can be run in parallel, it’s possible for both subtasks to be completed.)

Termination conditions can be combined using boolean logic: | indicates “or” and & indicates “and”. A variety of built-in conditions are available:

  • AllComplete(): stop when all tasks are complete (this is the default behavior)
  • MaxLLMCalls(n: int): stop when n LLM calls have been made (equivalent to providing max_llm_calls)
  • MaxAgentTurns(n: int): stop when n agent turns have been made (equivalent to providing max_agent_turns)
  • AnyComplete(tasks: list[Task], min_complete: int=1): stop when at least min_complete tasks are complete. If no tasks are provided, all of the orchestrator’s tasks will be used.
  • AnyFailed(tasks: list[Task], min_failed: int=1): stop when at least min_failed tasks have failed. If no tasks are provided, all of the orchestrator’s tasks will be used.

Accessing an orchestrator directly

If you want to “step” through the agentic loop yourself, you can create and invoke an Orchestrator directly.

The orchestrator is instantiated with the following arguments:

  • tasks: a list of tasks that it is responsible for orchestrating. Note it will also consider all of the tasks’ dependencies and subtasks, but these are the tasks that determine whether it is finished.
  • flow: the flow in which to run the tasks. If not provided, a new flow will be created.
  • agent: an initial agent to invoke. If not provided, the turn_strategy will be used to select the next agent.
  • turn_strategy: the turn strategy to use to select the next agent. The default is Popcorn.

You can then use the orchestrator’s run() method to step through the loop manually. If you call run() with no arguments, it will continue until all of the provided tasks are complete. You can provide max_llm_calls and max_agent_turns to further limit the behavior.

Using handlers

Handlers in ControlFlow provide a way to observe and react to events that occur during task execution. They allow you to customize logging, monitoring, or take specific actions based on the orchestration process.

ControlFlow supports both synchronous and asynchronous handlers. Synchronous handlers implement the Handler interface, while asynchronous handlers implement the AsyncHandler interface. Both interfaces define methods for various events that can occur during task execution, including agent messages (and message deltas), user messages, tool calls, tool results, orchestrator sessions starting or stopping, and more.

ControlFlow includes a built-in PrintHandler that pretty-prints agent responses and tool calls to the terminal. It’s used by default if controlflow.settings.enable_default_print_handler=True and no other handlers are provided.

How handlers work

Whenever an event is generated by ControlFlow, the orchestrator will pass it to all of its registered handlers. Each handler will dispatch to one of its methods based on the type of event. For example, an AgentMessage event will be handled by the handler’s on_agent_message method (or on_agent_message_async for async handlers). The on_event method is always called for every event. This table describes all event types and the methods they are dispatched to:

Event TypeMethod
Event (all events)on_event
UserMessageon_user_message
OrchestratorMessageon_orchestrator_message
AgentMessageon_agent_message
AgentMessageDeltaon_agent_message_delta
ToolCallon_tool_call
ToolResulton_tool_result
OrchestratorStarton_orchestrator_start
OrchestratorEndon_orchestrator_end
OrchestratorErroron_orchestrator_error
EndTurnon_end_turn

Writing a custom handler

To create a custom handler, subclass either the Handler class for synchronous handlers or the AsyncHandler class for asynchronous handlers. Implement the methods for the events you’re interested in. Here are examples of both types:

Synchronous Handler

import controlflow as cf
from controlflow.orchestration.handler import Handler
from controlflow.events.events import AgentMessage

class LoggingHandler(Handler):
    def on_agent_message(self, event: AgentMessage):
        print(f"Agent {event.agent.name} said: {event.ai_message.content}")

cf.run("Write a short poem about AI", handlers=[LoggingHandler()])

Asynchronous Handler

New in version 0.11.1
import asyncio
import controlflow as cf
from controlflow.orchestration.handler import AsyncHandler
from controlflow.events.events import AgentMessage

class AsyncLoggingHandler(AsyncHandler):
    async def on_agent_message(self, event: AgentMessage):
        await asyncio.sleep(0.1)  # Simulate some async operation
        print(f"Agent {event.agent.name} said: {event.ai_message.content}")

await cf.run_async("Write a short poem about AI", handlers=[AsyncLoggingHandler()])

When using asynchronous handlers, make sure to use the run_async function or other asynchronous methods in ControlFlow to properly handle the asynchronous events.

You can use both synchronous and asynchronous handlers together in the same async run. The orchestrator will automatically handle both types appropriately.