Build complex pipeline structures with branching, conditions, and trigger rules!
Dependencies are the rules that control the order tasks run in. They tell Airflow: "Task B can only start after Task A has finished." Without dependencies, your pipeline would be chaos — tasks would run in random order!
Imagine baking a cake. You can't frost a cake before you bake it! You can't put it in the oven before you mix the batter! Dependencies are like recipe steps — each step has to happen in the right order. Airflow dependencies do the same thing for your data pipeline: they make sure every "recipe step" (task) happens at the right time.
The simplest dependency type: one task after another, like a straight line. You've already seen this with the >> operator!
A >> B = A is "upstream" of B. A runs first, then B. (B is downstream of A)
B << A = Same thing! B is downstream of A. Both mean "A before B."
Linear dependencies are like an assembly line at a toy factory: Step 1 (cut plastic) → Step 2 (assemble) → Step 3 (paint) → Step 4 (pack). Each step feeds the next. You never paint before assembling!
You can chain multiple tasks: A >> B >> C means A runs first, then B, then C.
A >> B >> C is the same as A >> B and B >> C. The >> operator connects tasks in a chain.
Fan-out means one task feeds into multiple tasks. Think: one starting point, many branches going out — like a hand with fingers!
Imagine you're the chef and you've just made a big pot of tomato sauce. Now you can use it for pasta, pizza, and lasagna — all at the same time! One pot of sauce (Task A) feeds three different dishes (Tasks B, C, D). That's fan-out!
Syntax: A >> [B, C, D] means "A runs first, then B, C, and D can all run in parallel."
Fan-in is the opposite: multiple tasks feed into one. Many branches converge into a single point.
Picture three highways merging into one. Cars from Highway B, C, and D all need to merge onto Highway E. The merge point (Task E) can only start once all incoming roads (B, C, D) have delivered their traffic. In Airflow: [B, C, D] >> E means E waits for B, C, and D to finish.
The diamond combines fan-out and fan-in: one task splits into many, then many merge back into one. It looks like a diamond (◇)!
Example: A >> [B, C] >> D
Fetch data (A) → Clean US data (B) and Clean EU data (C) in parallel → Merge and load (D). The diamond pattern is incredibly common in real ETL pipelines!
Branching lets you choose different paths at runtime. Based on some condition, you decide which tasks to run and which to skip.
The branch function returns the task_id(s) of the next task(s) to run. All other downstream tasks in that branch get skipped.
When you branch, only the task(s) you return get executed. Everything else in the branch is marked as "skipped" (a valid state, not a failure!).
Your branch function must return a single task_id string, or a list of task_ids. Airflow will only run those tasks. Any downstream task that isn't in the path will be skipped.
Imagine a choose-your-own-adventure book. "If it's raining, turn to page 10. If it's sunny, turn to page 20." The branch operator is like that — it checks the weather (or data, or date) and picks which path to take. The other path doesn't run at all!
Sometimes you want to conditionally skip tasks without full branching. Two approaches:
You can add if/else logic inside your task's Python code. The task runs, but it might do nothing or do something different based on a condition. This doesn't change the DAG structure.
The ShortCircuitOperator is a special operator. If its condition returns False, it short-circuits: it skips all downstream tasks. If it returns True, downstream tasks run normally. Great for "should we continue?" checks.
BranchPythonOperator: Picks which path to take (path A or path B).
ShortCircuitOperator: Decides "continue or stop." If stop, everything downstream is skipped.
By default, a task runs when all its upstream tasks have succeeded. But sometimes you want different behavior! Trigger rules control when a task is allowed to run based on its upstream tasks' states.
| Trigger Rule | When Does the Task Run? |
|---|---|
all_success | Default! All upstream tasks must succeed. |
all_failed | All upstream tasks must have failed. (Useful for cleanup!) |
all_done | All upstream tasks finished (success, failed, or skipped). |
one_success | At least one upstream task succeeded. |
one_failed | At least one upstream task failed. |
none_failed | No upstream tasks failed (success or skipped are OK). |
none_skipped | No upstream tasks were skipped. |
When you use branching, some tasks get skipped. If a downstream task uses all_success (default), it will never run if an upstream task was skipped (skipped counts as "not success"). Use none_failed or all_done when downstream of branches so your join task runs even when some branches were skipped!
After a BranchPythonOperator, your "join" task often needs trigger_rule="none_failed" or trigger_rule="all_done". Otherwise it'll wait forever for the skipped branch to "succeed" — and skipped tasks never succeed!
An alternative to >> and <<: you can use methods instead!
task_a.set_downstream(task_b) = A >> Btask_b.set_upstream(task_a) = same thing!Some people prefer these when building dependencies dynamically in loops. Both styles work!
Sometimes Task A in DAG 1 must wait for Task B in DAG 2 to finish. That's a cross-DAG dependency. Airflow's ExternalTaskSensor does exactly that: it waits (polls) until a task in another DAG has completed.
Imagine two friends baking. Friend 1 makes the cake. Friend 2 makes the frosting. Friend 2 can't frost until Friend 1's cake is done! The ExternalTaskSensor is Friend 2 peeking into Friend 1's kitchen: "Is the cake ready yet? Is it ready? Okay, now I'll frost!"
Task Groups let you group related tasks together in the UI. They don't change execution logic — they're for organization and readability. Instead of 20 flat tasks, you get collapsible groups like "Fetch Data", "Transform", "Load".
When your DAG has 50+ tasks, the Graph View becomes a mess. Task Groups collapse related tasks into a single box. You can also set dependencies on entire groups: group1 >> group2.
Visual diagrams for each dependency pattern:
Linear: A >> B >> C
Fan-out: A >> [B, C, D]
Fan-in: [B, C, D] >> E
Diamond: A >> [B, C] >> D
When a branch runs, one path executes and the other is skipped:
Branch Decision Flow
all_success → All upstream ✓
one_success → At least one upstream ✓
all_done → All upstream finished (any state)
none_failed → No upstream ✗ (success or skipped OK)
all_failed → All upstream ✗ (for cleanup tasks)
How failure cascades: when Task B fails, downstream tasks (with default trigger_rule) won't run:
Failure at B prevents C and D from running (default trigger_rule)
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="dependency_patterns_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False, ) as dag: # Linear: A >> B >> C a = BashOperator(task_id="task_a", bash_command='echo "A"') b = BashOperator(task_id="task_b", bash_command='echo "B"') c = BashOperator(task_id="task_c", bash_command='echo "C"') a >> b >> c # Fan-out: one feeds many start = BashOperator(task_id="start", bash_command='echo "Start"') x = BashOperator(task_id="task_x", bash_command='echo "X"') y = BashOperator(task_id="task_y", bash_command='echo "Y"') z = BashOperator(task_id="task_z", bash_command='echo "Z"') start >> [x, y, z] # Fan-in: many feed one join = BashOperator(task_id="join", bash_command='echo "Join"') [x, y, z] >> join # Diamond: A >> [B, C] >> D fetch = BashOperator(task_id="fetch", bash_command='echo "Fetch"') clean_a = BashOperator(task_id="clean_a", bash_command='echo "Clean A"') clean_b = BashOperator(task_id="clean_b", bash_command='echo "Clean B"') merge = BashOperator(task_id="merge", bash_command='echo "Merge"') fetch >> [clean_a, clean_b] >> merge
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import BranchPythonOperator from datetime import datetime def choose_path(**context): # Example: branch based on execution date (weekday) exec_date = context["logical_date"] if exec_date.weekday() < 5: # Mon-Fri return "weekday_task" else: return "weekend_task" with DAG( dag_id="branching_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False, ) as dag: branch = BranchPythonOperator( task_id="branch", python_callable=choose_path, ) weekday_task = BashOperator(task_id="weekday_task", bash_command='echo "Weekday!"') weekend_task = BashOperator(task_id="weekend_task", bash_command='echo "Weekend!"') join = BashOperator( task_id="join", bash_command='echo "Done"', trigger_rule="none_failed", # Critical! One branch is skipped ) branch >> [weekday_task, weekend_task] [weekday_task, weekend_task] >> join
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.utils.trigger_rule import TriggerRule from datetime import datetime with DAG( dag_id="trigger_rules_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False, ) as dag: t1 = BashOperator(task_id="t1", bash_command='echo "t1"') t2 = BashOperator(task_id="t2", bash_command='exit 1') # Fails! t3 = BashOperator(task_id="t3", bash_command='echo "t3"') # cleanup runs when at least one upstream fails (one_failed) cleanup = BashOperator( task_id="cleanup", bash_command='echo "Notify on failure"', trigger_rule=TriggerRule.ONE_FAILED, ) t1 >> [t2, t3] >> cleanup
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.utils.task_group import TaskGroup from datetime import datetime with DAG( dag_id="task_groups_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False, ) as dag: with TaskGroup("fetch_data") as fetch_group: fetch_a = BashOperator(task_id="fetch_a", bash_command='echo "Fetch A"') fetch_b = BashOperator(task_id="fetch_b", bash_command='echo "Fetch B"') with TaskGroup("transform") as transform_group: transform = BashOperator(task_id="transform", bash_command='echo "Transform"') with TaskGroup("load") as load_group: load = BashOperator(task_id="load", bash_command='echo "Load"') fetch_group >> transform_group >> load_group
Create a DAG that: (1) fetches data from an API, (2) runs two parallel "clean" tasks (clean_sales and clean_inventory), (3) merges the cleaned data. Use the diamond pattern: fetch >> [clean_sales, clean_inventory] >> merge.
Each task can be a BashOperator with echo for now. The key is getting the dependency syntax right: fetch >> [clean_sales, clean_inventory] >> merge.
Use BranchPythonOperator to branch based on the day of the month: if the day is 1–15, run "first_half_task"; otherwise run "second_half_task". Make sure your "join" task runs with trigger_rule="none_failed".
Create a DAG with two parallel tasks where one might fail. Add a "notify_on_failure" task that runs only when at least one upstream task has failed. (Hint: trigger_rule="one_failed")
Refactor a flat DAG (5+ tasks) into Task Groups: "extract", "transform", "load". Each group should contain at least one task. Set dependencies between groups: extract_group >> transform_group >> load_group.
Test your understanding! Click on the answer you think is correct.