Dependencies
Manage task dependencies to create complex, multi-step workflows.
In complex workflows, tasks often need to be executed in a specific order. Some tasks may rely on the outputs of others, or there might be a logical sequence that must be followed to achieve the desired outcome. ControlFlow provides several mechanisms to define and manage these task dependencies, ensuring that your workflow executes in the correct order and that data flows properly between tasks.
ControlFlow offers three primary ways to establish dependencies between tasks: sequential dependencies, context dependencies, and subtask relationships. Each method has its own use cases and benefits, allowing you to structure your workflows in the most appropriate way for your specific needs.
Sequential Dependencies
Sequential dependencies are the most straightforward way to specify that one task must wait for another to complete before it can begin. This is done using the depends_on
parameter when creating a task.
import controlflow as cf
@cf.flow
def research_flow():
gather_sources = cf.Task("Gather research sources", result_type=list[str])
analyze_sources = cf.Task(
"Analyze gathered sources",
result_type=dict,
depends_on=[gather_sources] # explicit dependency
)
return analyze_sources
result = research_flow()
print(result)
In this example, analyze_sources
will not start until gather_sources
has completed successfully.
Context Dependencies
Context dependencies are created when you use the result of one task as input for another. This creates an implicit dependency between the tasks.
import controlflow as cf
@cf.flow
def research_flow():
gather_sources = cf.Task("Gather research sources", result_type=list[str])
analyze_sources = cf.Task(
"Analyze gathered sources",
result_type=dict,
context={"sources": gather_sources} # implicit dependency
)
return analyze_sources
result = research_flow()
print(result)
Here, analyze_sources
depends on gather_sources
because it needs the sources
data to perform its analysis.
Subtask Relationships
Subtasks create a hierarchical dependency structure. A parent task is considered complete only when all its subtasks have finished.
import controlflow as cf
@cf.flow
def review_flow(doc):
with cf.Task("Review the document", result_type=str, context=dict(doc=doc)) as review:
cf.Task("Proofread")
cf.Task("Format")
return review
result = review_flow()
print(result)
In this example, the “Review the document” task won’t be considered complete until both the “Proofread” and “Format” subtasks have finished.
Automatic Execution of Dependencies
A key feature of ControlFlow’s dependency management is that you don’t need to explicitly run dependent tasks. When you run a task, ControlFlow automatically executes all of its dependencies, including:
- Tasks specified in the
depends_on
parameter - Tasks used in the
context
parameter - Subtasks (for parent tasks)
This means that when you run a flow or task, you only need to run or return the final task(s) in the workflow DAG. ControlFlow will ensure that all necessary upstream tasks and subtasks are executed in the correct order.
For example:
import controlflow as cf
@cf.flow
def research_flow():
gather_sources = cf.Task("Gather research sources", result_type=list[str])
analyze_sources = cf.Task(
"Analyze gathered sources",
result_type=dict,
context={"sources": gather_sources}
)
write_report = cf.Task(
"Write research report",
result_type=str,
depends_on=[analyze_sources]
)
# Only need to return or run the final task
return write_report
result = research_flow()
print(result)
In this example, running write_report will automatically trigger the execution of analyze_sources, which in turn will trigger gather_sources. You don’t need to explicitly run or return gather_sources or analyze_sources.