MODULE 5 OF 15

Task Dependencies

Build complex pipeline structures with branching, conditions, and trigger rules!

What are Dependencies?

Dependencies are the rules that control the order tasks run in. They tell Airflow: "Task B can only start after Task A has finished." Without dependencies, your pipeline would be chaos — tasks would run in random order!

Explain Like I'm 5

Imagine baking a cake. You can't frost a cake before you bake it! You can't put it in the oven before you mix the batter! Dependencies are like recipe steps — each step has to happen in the right order. Airflow dependencies do the same thing for your data pipeline: they make sure every "recipe step" (task) happens at the right time.

Linear Dependencies

The simplest dependency type: one task after another, like a straight line. You've already seen this with the >> operator!

The >> and << Operators

A >> B = A is "upstream" of B. A runs first, then B. (B is downstream of A)

B << A = Same thing! B is downstream of A. Both mean "A before B."

The Assembly Line

Linear dependencies are like an assembly line at a toy factory: Step 1 (cut plastic) → Step 2 (assemble) → Step 3 (paint) → Step 4 (pack). Each step feeds the next. You never paint before assembling!

Chaining

You can chain multiple tasks: A >> B >> C means A runs first, then B, then C.

Key Takeaway

A >> B >> C is the same as A >> B and B >> C. The >> operator connects tasks in a chain.

Fan-out Dependencies

Fan-out means one task feeds into multiple tasks. Think: one starting point, many branches going out — like a hand with fingers!

Explain Like I'm 5

Imagine you're the chef and you've just made a big pot of tomato sauce. Now you can use it for pasta, pizza, and lasagna — all at the same time! One pot of sauce (Task A) feeds three different dishes (Tasks B, C, D). That's fan-out!

Syntax: A >> [B, C, D] means "A runs first, then B, C, and D can all run in parallel."

Fan-in Dependencies

Fan-in is the opposite: multiple tasks feed into one. Many branches converge into a single point.

The Highway Merge

Picture three highways merging into one. Cars from Highway B, C, and D all need to merge onto Highway E. The merge point (Task E) can only start once all incoming roads (B, C, D) have delivered their traffic. In Airflow: [B, C, D] >> E means E waits for B, C, and D to finish.

The Diamond Pattern

The diamond combines fan-out and fan-in: one task splits into many, then many merge back into one. It looks like a diamond (◇)!

Example: A >> [B, C] >> D

  • A runs first
  • B and C run in parallel (both depend on A)
  • D runs only after both B and C finish

Real-World Example

Fetch data (A) → Clean US data (B) and Clean EU data (C) in parallel → Merge and load (D). The diamond pattern is incredibly common in real ETL pipelines!

Branching with BranchPythonOperator

Branching lets you choose different paths at runtime. Based on some condition, you decide which tasks to run and which to skip.

Dynamic Path Selection

The branch function returns the task_id(s) of the next task(s) to run. All other downstream tasks in that branch get skipped.

Skipping Tasks

When you branch, only the task(s) you return get executed. Everything else in the branch is marked as "skipped" (a valid state, not a failure!).

The Branch Function Returns task_id(s)

Your branch function must return a single task_id string, or a list of task_ids. Airflow will only run those tasks. Any downstream task that isn't in the path will be skipped.

Explain Like I'm 5

Imagine a choose-your-own-adventure book. "If it's raining, turn to page 10. If it's sunny, turn to page 20." The branch operator is like that — it checks the weather (or data, or date) and picks which path to take. The other path doesn't run at all!

Conditional Tasks

Sometimes you want to conditionally skip tasks without full branching. Two approaches:

1. Conditions Inside Tasks

You can add if/else logic inside your task's Python code. The task runs, but it might do nothing or do something different based on a condition. This doesn't change the DAG structure.

2. ShortCircuitOperator

The ShortCircuitOperator is a special operator. If its condition returns False, it short-circuits: it skips all downstream tasks. If it returns True, downstream tasks run normally. Great for "should we continue?" checks.

ShortCircuit vs Branch

BranchPythonOperator: Picks which path to take (path A or path B).
ShortCircuitOperator: Decides "continue or stop." If stop, everything downstream is skipped.

Trigger Rules (VERY Important!)

By default, a task runs when all its upstream tasks have succeeded. But sometimes you want different behavior! Trigger rules control when a task is allowed to run based on its upstream tasks' states.

Trigger RuleWhen Does the Task Run?
all_successDefault! All upstream tasks must succeed.
all_failedAll upstream tasks must have failed. (Useful for cleanup!)
all_doneAll upstream tasks finished (success, failed, or skipped).
one_successAt least one upstream task succeeded.
one_failedAt least one upstream task failed.
none_failedNo upstream tasks failed (success or skipped are OK).
none_skippedNo upstream tasks were skipped.

How Trigger Rules Interact with Branching

When you use branching, some tasks get skipped. If a downstream task uses all_success (default), it will never run if an upstream task was skipped (skipped counts as "not success"). Use none_failed or all_done when downstream of branches so your join task runs even when some branches were skipped!

Pro Tip

After a BranchPythonOperator, your "join" task often needs trigger_rule="none_failed" or trigger_rule="all_done". Otherwise it'll wait forever for the skipped branch to "succeed" — and skipped tasks never succeed!

set_upstream() and set_downstream()

An alternative to >> and <<: you can use methods instead!

  • task_a.set_downstream(task_b) = A >> B
  • task_b.set_upstream(task_a) = same thing!

Some people prefer these when building dependencies dynamically in loops. Both styles work!

Cross-DAG Dependencies (ExternalTaskSensor Preview)

Sometimes Task A in DAG 1 must wait for Task B in DAG 2 to finish. That's a cross-DAG dependency. Airflow's ExternalTaskSensor does exactly that: it waits (polls) until a task in another DAG has completed.

Explain Like I'm 5

Imagine two friends baking. Friend 1 makes the cake. Friend 2 makes the frosting. Friend 2 can't frost until Friend 1's cake is done! The ExternalTaskSensor is Friend 2 peeking into Friend 1's kitchen: "Is the cake ready yet? Is it ready? Okay, now I'll frost!"

Task Groups for Organization

Task Groups let you group related tasks together in the UI. They don't change execution logic — they're for organization and readability. Instead of 20 flat tasks, you get collapsible groups like "Fetch Data", "Transform", "Load".

Why Use Task Groups?

When your DAG has 50+ tasks, the Graph View becomes a mess. Task Groups collapse related tasks into a single box. You can also set dependencies on entire groups: group1 >> group2.

Dependency Patterns

Visual diagrams for each dependency pattern:

1. Linear Chain (A → B → C)

A B C

Linear: A >> B >> C

2. Fan-out (A → B, C, D)

A B C D

Fan-out: A >> [B, C, D]

3. Fan-in ([B, C, D] → E)

B C D E

Fan-in: [B, C, D] >> E

4. Diamond Pattern (A → [B, C] → D)

A B C D

Diamond: A >> [B, C] >> D

Animated Branching

When a branch runs, one path executes and the other is skipped:

Branch Decision Flow


Start

Branch

Path A
or

Path B (skipped)

Trigger Rules Decision Flow

When does a task run? It depends on its trigger_rule!

all_success → All upstream ✓

one_success → At least one upstream ✓

all_done → All upstream finished (any state)

none_failed → No upstream ✗ (success or skipped OK)

all_failed → All upstream ✗ (for cleanup tasks)

Task State Propagation

How failure cascades: when Task B fails, downstream tasks (with default trigger_rule) won't run:

A ✓ B ✗ C (upstream_failed) D (never ran)

Failure at B prevents C and D from running (default trigger_rule)

Linear, Fan-out, Fan-in, Diamond

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="dependency_patterns_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    # Linear: A >> B >> C
    a = BashOperator(task_id="task_a", bash_command='echo "A"')
    b = BashOperator(task_id="task_b", bash_command='echo "B"')
    c = BashOperator(task_id="task_c", bash_command='echo "C"')
    a >> b >> c

    # Fan-out: one feeds many
    start = BashOperator(task_id="start", bash_command='echo "Start"')
    x = BashOperator(task_id="task_x", bash_command='echo "X"')
    y = BashOperator(task_id="task_y", bash_command='echo "Y"')
    z = BashOperator(task_id="task_z", bash_command='echo "Z"')
    start >> [x, y, z]

    # Fan-in: many feed one
    join = BashOperator(task_id="join", bash_command='echo "Join"')
    [x, y, z] >> join

    # Diamond: A >> [B, C] >> D
    fetch = BashOperator(task_id="fetch", bash_command='echo "Fetch"')
    clean_a = BashOperator(task_id="clean_a", bash_command='echo "Clean A"')
    clean_b = BashOperator(task_id="clean_b", bash_command='echo "Clean B"')
    merge = BashOperator(task_id="merge", bash_command='echo "Merge"')
    fetch >> [clean_a, clean_b] >> merge

Branching Example

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime

def choose_path(**context):
    # Example: branch based on execution date (weekday)
    exec_date = context["logical_date"]
    if exec_date.weekday() < 5:  # Mon-Fri
        return "weekday_task"
    else:
        return "weekend_task"

with DAG(
    dag_id="branching_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    branch = BranchPythonOperator(
        task_id="branch",
        python_callable=choose_path,
    )

    weekday_task = BashOperator(task_id="weekday_task", bash_command='echo "Weekday!"')
    weekend_task = BashOperator(task_id="weekend_task", bash_command='echo "Weekend!"')

    join = BashOperator(
        task_id="join",
        bash_command='echo "Done"',
        trigger_rule="none_failed",  # Critical! One branch is skipped
    )

    branch >> [weekday_task, weekend_task]
    [weekday_task, weekend_task] >> join

Trigger Rules Example

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

with DAG(
    dag_id="trigger_rules_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    t1 = BashOperator(task_id="t1", bash_command='echo "t1"')
    t2 = BashOperator(task_id="t2", bash_command='exit 1')  # Fails!
    t3 = BashOperator(task_id="t3", bash_command='echo "t3"')

    # cleanup runs when at least one upstream fails (one_failed)
    cleanup = BashOperator(
        task_id="cleanup",
        bash_command='echo "Notify on failure"',
        trigger_rule=TriggerRule.ONE_FAILED,
    )

    t1 >> [t2, t3] >> cleanup

Task Groups Example

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime

with DAG(
    dag_id="task_groups_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    with TaskGroup("fetch_data") as fetch_group:
        fetch_a = BashOperator(task_id="fetch_a", bash_command='echo "Fetch A"')
        fetch_b = BashOperator(task_id="fetch_b", bash_command='echo "Fetch B"')

    with TaskGroup("transform") as transform_group:
        transform = BashOperator(task_id="transform", bash_command='echo "Transform"')

    with TaskGroup("load") as load_group:
        load = BashOperator(task_id="load", bash_command='echo "Load"')

    fetch_group >> transform_group >> load_group

Practice Exercises

Exercise 1: Build a Diamond DAG

Challenge

Create a DAG that: (1) fetches data from an API, (2) runs two parallel "clean" tasks (clean_sales and clean_inventory), (3) merges the cleaned data. Use the diamond pattern: fetch >> [clean_sales, clean_inventory] >> merge.

Hint

Each task can be a BashOperator with echo for now. The key is getting the dependency syntax right: fetch >> [clean_sales, clean_inventory] >> merge.

Exercise 2: Implement Branching

Challenge

Use BranchPythonOperator to branch based on the day of the month: if the day is 1–15, run "first_half_task"; otherwise run "second_half_task". Make sure your "join" task runs with trigger_rule="none_failed".

Exercise 3: Use Trigger Rules

Challenge

Create a DAG with two parallel tasks where one might fail. Add a "notify_on_failure" task that runs only when at least one upstream task has failed. (Hint: trigger_rule="one_failed")

Exercise 4: Task Groups

Challenge

Refactor a flat DAG (5+ tasks) into Task Groups: "extract", "transform", "load". Each group should contain at least one task. Set dependencies between groups: extract_group >> transform_group >> load_group.

Module 5 Quiz

Test your understanding! Click on the answer you think is correct.

1. What does A >> B mean in Airflow?

2. What pattern does A >> [B, C, D] represent?

3. What does the BranchPythonOperator's callable return?

4. What is the default trigger rule for a task?

5. After a BranchPythonOperator, which trigger_rule is often needed for the join task?

6. What does ShortCircuitOperator do when it returns False?

7. [B, C, D] >> E creates which pattern?

8. Task Groups are primarily used for ___?