MODULE 7 OF 15

Sensors & Triggers

Wait for files, events, and conditions before proceeding!

What are Sensors?

Sensors are special Airflow operators that don't do work immediately — they wait for a condition to be met before allowing the pipeline to continue. While regular operators fetch data or run calculations, sensors sit and check until something happens.

Explain Like I'm 5

Like waiting for the pizza delivery guy — you keep checking the door until the pizza arrives. You don't eat before it gets there, and you don't give up after one peek. You check, wait, check again, wait... until finally the pizza (or file, or API response) shows up. Then you proceed!

Sensors are perfect for pipelines that depend on external events: a file landing in a folder, another DAG finishing, an API returning success, or a database query returning results. Without sensors, you'd have to guess when to run — and often fail.

Key Idea

Sensors = Operators that WAIT for a condition. They poke (check) repeatedly until the condition is True, then succeed and let downstream tasks run.

How Sensors Work

Every sensor inherits from BaseSensorOperator and implements a poke() method. The sensor keeps calling poke() until it returns True (condition met) or hits a timeout.

Key Parameters

1
poke()

Runs repeatedly. Return True when the condition is met; False to try again later.

2
timeout

Max seconds to wait. After this, the sensor fails (unless soft_fail=True).

3
poke_interval

Seconds between each poke. Default is 60. Lower = more frequent checks, more load.

4
soft_fail

If True, timeout results in "skipped" instead of "failed". Useful for optional waits.

Sensor Loop (Simplified)

while not poke(): sleep(poke_interval); if elapsed > timeout: fail

Poke Mode vs Reschedule Mode

Poke Mode (default)

Occupies a worker slot while waiting. The task sits in "running" state and keeps the worker busy checking every poke_interval. Bad for long waits — you burn worker capacity!

Reschedule Mode

Releases the slot and reschedules itself for later. Worker is free to run other tasks. When poke_interval elapses, the sensor runs again. Better for long waits!

Set mode="reschedule" on your sensor to use reschedule mode. Use mode="poke" (default) only when waits are short (seconds to a few minutes).

When to Use Which

Poke: Short waits (< 5 min), need fast reaction. Reschedule: Long waits (hours), limited workers, many concurrent sensors.

FileSensor

Waits for a file (or directory) to appear. Use when an upstream process drops files you need to process.

  • filepath — Path to the file or directory (supports Jinja)
  • fs_conn_id — Connection for the filesystem (e.g. local, S3, GCS)
  • glob — Glob pattern like *.csv to match multiple files

Example

Wait for /data/daily_report_{{ ds }}.csv before running your analytics DAG. The FileSensor checks every minute until the file appears.

ExternalTaskSensor

Waits for a task in another DAG to finish. Perfect for cross-DAG dependencies (e.g. "run my DAG only after the ingestion DAG has loaded today's data").

  • external_dag_id — The DAG you're waiting on
  • external_task_id — The specific task (or None for whole DAG)
  • execution_delta / execution_date_fn — Align execution dates (e.g. wait for same logical date)

Execution Date Alignment

Your DAG run and the external DAG run must align. Use execution_delta (e.g. timedelta(hours=1)) if they run at different times. This is tricky — read the docs!

HttpSensor

Waits for an HTTP endpoint to return success (e.g. 200). Use when an external API or service must be ready before you proceed.

  • http_conn_id — Connection with base URL
  • endpoint — Path to hit (e.g. /health)
  • response_check — Optional function to validate the response (e.g. check JSON field)

Analogy

Like calling a friend to see if they're home before you drive over. You keep calling until they pick up (success)!

SqlSensor

Runs a SQL query and succeeds when it returns at least one row. Use to wait for data to arrive in a table (e.g. "wait until today's batch appears").

Tip

Your query should return a row when the condition is met. Example: SELECT 1 FROM events WHERE date = '{{ ds }}' LIMIT 1

DateTimeSensor

Waits until a specific datetime. Useful for "run this task at exactly 9:00 AM" or "wait until market opens."

Custom Sensors

Create your own by extending BaseSensorOperator and implementing poke(self, context). Return True when done, False to retry.

from airflow.sensors.base import BaseSensorOperator

class MyCustomSensor(BaseSensorOperator):
    def poke(self, context):
        # Your logic here
        return some_condition_met  # True or False

TriggerDagRunOperator

Triggers another DAG from within your DAG. Pass conf to send configuration (e.g. date, parameters). The triggered DAG runs asynchronously.

Example

DAG A finishes loading data. It triggers DAG B with conf={"date": "{{ ds }}"}. DAG B can read this via dag_run.conf.

Starting Workflows: REST API & CLI

Trigger DAGs without the UI:

  • CLI: airflow dags trigger my_dag
  • REST API: POST /api/v1/dags/{dag_id}/dagRuns with optional conf

Deferrable Operators (Airflow 2.x)

Async sensors that don't use workers at all while waiting. They defer to a "triggerer" process and wake up when ready. Best of both worlds: no worker slots, no polling!

Preferred for New Code

Use TimeSensorAsync, FileSensorAsync, etc. when available. They're more efficient than classic poke/reschedule sensors.

Poke Mode vs Reschedule Mode

See how sensors behave differently: Poke holds a worker slot; Reschedule releases it and tries again later.

Poke Mode: Worker Busy Sensor (holds slot) poke() sleep... poke() repeat Reschedule Mode: Worker Freed Sensor Release Re-run later

Poke: Sensor keeps worker busy. Reschedule: Sensor releases slot, reschedules itself.

Sensor Lifecycle

From start to success: the sensor loops until the condition is met or timeout.

Fetch Weather Clean Weather Fetch Sales Clean Sales Join Data Train Model Branch 1 (parallel) Branch 2 (parallel)

Weather and Sales branches run in parallel, then join together

Why Parallel Matters

If "Fetch Weather" takes 5 minutes and "Fetch Sales" takes 3 minutes, running them in parallel means the total is only 5 minutes (not 8). DAGs automatically detect which tasks can run in parallel!

Airflow Architecture — Animated

Watch how data flows through Airflow's components:

How Airflow Processes a DAG


DAG File

Scheduler

Task Queue

Worker

Results in DB

Web UI

The Airflow Web UI

Airflow comes with a stunning web interface. Here's what you'll see:

Key Views in the Airflow UI

DAGs List View

The home page. Shows all your DAGs with their status (running, paused, failed), schedule, owner, and recent task statuses as colored circles.

Graph View

Shows the structure of a DAG as a visual graph — tasks as boxes and dependencies as arrows. Color-coded by task state (green = success, red = failed, yellow = running).

Grid View (formerly Tree View)

Shows the history of all runs over time. Each column is a DAG run, each row is a task. This is the most powerful view for debugging — you can see patterns like "this task fails every Monday."

Code View

Shows the actual Python code of your DAG file directly in the browser. Great for quick debugging without opening your IDE.

Who Uses Airflow?

Airbnb

Created Airflow! Uses it for search ranking, pricing, and host analytics pipelines.

Spotify

Runs 20,000+ DAGs for recommendation engines and content analytics.

Uber

Orchestrates ML model training, pricing calculations, and driver analytics.

Netflix

Manages data pipelines that power content recommendations for 200M+ users.

FileSensor Example

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="file_sensor_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    wait_for_file = FileSensor(
        task_id="wait_for_file",
        filepath="/data/reports/{{ ds }}.csv",
        fs_conn_id="fs_default",
        poke_interval=60,
        timeout=3600,
        mode="reschedule",
    )

    process = BashOperator(
        task_id="process_file",
        bash_command="echo 'Processing file...'",
    )

    wait_for_file >> process

ExternalTaskSensor Example

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

with DAG(
    dag_id="downstream_dag",
    start_date=datetime(2024, 1, 1),
    schedule="0 2 * * *",  # 2 AM daily
    catchup=False,
) as dag:

    wait_for_ingestion = ExternalTaskSensor(
        task_id="wait_for_ingestion",
        external_dag_id="ingestion_dag",
        external_task_id="load_complete",
        execution_delta=timedelta(hours=1),
        timeout=7200,
    )

    run_analytics = BashOperator(
        task_id="run_analytics",
        bash_command="echo 'Running analytics...'",
    )

    wait_for_ingestion >> run_analytics

HttpSensor Example

from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.operators.bash import BashOperator
from datetime import datetime

def check_response(response):
    return response.status_code == 200 and "ready" in response.json().get("status", "")

with DAG(
    dag_id="http_sensor_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    wait_for_api = HttpSensor(
        task_id="wait_for_api",
        http_conn_id="my_api",
        endpoint="/health",
        response_check=check_response,
        poke_interval=30,
    )

    fetch_data = BashOperator(
        task_id="fetch_data",
        bash_command="echo 'API ready, fetching...'",
    )

    wait_for_api >> fetch_data

SqlSensor Example

from airflow import DAG
from airflow.providers.common.sql.sensors.sql import SqlSensor
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="sql_sensor_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    wait_for_data = SqlSensor(
        task_id="wait_for_data",
        conn_id="postgres_default",
        sql="SELECT 1 FROM events WHERE date = '{{ ds }}' LIMIT 1",
        poke_interval=60,
    )

    process = BashOperator(
        task_id="process",
        bash_command="echo 'Data arrived!'",
    )

    wait_for_data >> process

TriggerDagRunOperator Example

from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="parent_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    prepare = BashOperator(
        task_id="prepare",
        bash_command="echo 'Data prepared'",
    )

    trigger_child = TriggerDagRunOperator(
        task_id="trigger_child",
        trigger_dag_id="child_dag",
        conf={"date": "{{ ds }}", "source": "parent"},
    )

    prepare >> trigger_child

Practice Exercises

Try these sensor-focused exercises. Don't peek at the answers until you've tried!

Exercise 1: Choose the Right Sensor

Scenario

Your DAG needs to wait until a daily CSV file sales_{{ ds }}.csv appears in /data/ before processing. Which sensor do you use?

Question: Name the sensor and list the key parameters you'd set.

Answer

FileSensor. Key parameters: filepath="/data/sales_{{ ds }}.csv", fs_conn_id (if using remote FS), optionally poke_interval and timeout. Use mode="reschedule" if the wait could be long.

Exercise 2: Poke vs Reschedule

Scenario

You have 5 FileSensors in a DAG, each waiting potentially 2+ hours for files from different upstream systems. Your Airflow cluster has only 4 workers.

Question: What problem will you hit, and how do you fix it?

Answer

Problem: In poke mode, each sensor occupies a worker slot. 5 sensors and 4 workers = one sensor will be queued, and you'll block other DAGs.

Fix: Set mode="reschedule" on all sensors. They'll release the slot between pokes and reschedule themselves. Workers stay free for other tasks.

Exercise 3: Cross-DAG Dependency

Scenario

Your analytics_dag must run only after ingestion_dag's task load_to_warehouse has succeeded for the same logical date.

Question: Write the ExternalTaskSensor configuration (pseudocode or key args).

# Solution:
ExternalTaskSensor(
    task_id="wait_for_ingestion",
    external_dag_id="ingestion_dag",
    external_task_id="load_to_warehouse",
    # Same execution date by default; use execution_delta if schedules differ
)

Exercise 4: Custom Sensor Logic

Scenario

You need to wait until a Redis list has at least 10 items before proceeding. No built-in sensor exists.

Question: Sketch the poke() method for a custom sensor. What should it return when ready? When not ready?

Answer

Implement poke(self, context) that: (1) connects to Redis, (2) gets the list length, (3) returns True if length ≥ 10 (condition met, sensor succeeds), (4) returns False otherwise (Airflow will retry after poke_interval).

Module 7 Quiz

Test your understanding of Sensors & Triggers! Click on the answer you think is correct.

1. What do Airflow sensors do?

2. In poke mode, what happens while a sensor waits?

3. When should you use reschedule mode for a sensor?

4. Which sensor waits for a task in another DAG to complete?

5. What does the poke() method return when the sensor's condition is met?

6. What does soft_fail=True do when a sensor times out?

7. Which operator triggers another DAG from within your DAG?

8. What is an advantage of deferrable operators (e.g. TimeSensorAsync)?