Manage task dependencies to create complex, multi-step workflows.

In complex workflows, tasks often need to be executed in a specific order. Some tasks may rely on the outputs of others, or there might be a logical sequence that must be followed to achieve the desired outcome. ControlFlow provides several mechanisms to define and manage these task dependencies, ensuring that your workflow executes in the correct order and that data flows properly between tasks.

ControlFlow offers three primary ways to establish dependencies between tasks: sequential dependencies, context dependencies, and subtask relationships. Each method has its own use cases and benefits, allowing you to structure your workflows in the most appropriate way for your specific needs.

Sequential Dependencies

Sequential dependencies are the most straightforward way to specify that one task must wait for another to complete before it can begin. This is done using the depends_on parameter when creating a task.

import controlflow as cf

@cf.flow
def research_flow():
    gather_sources = cf.Task("Gather research sources", result_type=list[str])
    
    analyze_sources = cf.Task(
        "Analyze gathered sources",
        result_type=dict,
        depends_on=[gather_sources]  # explicit dependency
    )
    
    return analyze_sources

result = research_flow()
print(result)

In this example, analyze_sources will not start until gather_sources has completed successfully.

Context Dependencies

Context dependencies are created when you use the result of one task as input for another. This creates an implicit dependency between the tasks.

import controlflow as cf

@cf.flow
def research_flow():
    gather_sources = cf.Task("Gather research sources", result_type=list[str])
    
    analyze_sources = cf.Task(
        "Analyze gathered sources",
        result_type=dict,
        context={"sources": gather_sources}  # implicit dependency
    )
    
    return analyze_sources

result = research_flow()
print(result)

Here, analyze_sources depends on gather_sources because it needs the sources data to perform its analysis.

Subtask Relationships

Subtasks create a hierarchical dependency structure. A parent task is considered complete only when all its subtasks have finished.

import controlflow as cf

@cf.flow
def review_flow(doc):
    with cf.Task("Review the document", result_type=str, context=dict(doc=doc)) as review:
        cf.Task("Proofread")
        cf.Task("Format")
    
    return review

result = review_flow()
print(result)

In this example, the “Review the document” task won’t be considered complete until both the “Proofread” and “Format” subtasks have finished.

Automatic Execution of Dependencies

A key feature of ControlFlow’s dependency management is that you don’t need to explicitly run dependent tasks. When you run a task, ControlFlow automatically executes all of its dependencies, including:

  • Tasks specified in the depends_on parameter
  • Tasks used in the context parameter
  • Subtasks (for parent tasks)

This means that when you run a flow or task, you only need to run or return the final task(s) in the workflow DAG. ControlFlow will ensure that all necessary upstream tasks and subtasks are executed in the correct order.

For example:

import controlflow as cf

@cf.flow
def research_flow():
    gather_sources = cf.Task("Gather research sources", result_type=list[str])
    
    analyze_sources = cf.Task(
        "Analyze gathered sources",
        result_type=dict,
        context={"sources": gather_sources}
    )
    
    write_report = cf.Task(
        "Write research report",
        result_type=str,
        depends_on=[analyze_sources]
    )
    
    # Only need to return or run the final task
    return write_report

result = research_flow()
print(result)

In this example, running write_report will automatically trigger the execution of analyze_sources, which in turn will trigger gather_sources. You don’t need to explicitly run or return gather_sources or analyze_sources.