Wait for files, events, and conditions before proceeding!
Sensors are special Airflow operators that don't do work immediately — they wait for a condition to be met before allowing the pipeline to continue. While regular operators fetch data or run calculations, sensors sit and check until something happens.
Like waiting for the pizza delivery guy — you keep checking the door until the pizza arrives. You don't eat before it gets there, and you don't give up after one peek. You check, wait, check again, wait... until finally the pizza (or file, or API response) shows up. Then you proceed!
Sensors are perfect for pipelines that depend on external events: a file landing in a folder, another DAG finishing, an API returning success, or a database query returning results. Without sensors, you'd have to guess when to run — and often fail.
Sensors = Operators that WAIT for a condition. They poke (check) repeatedly until the condition is True, then succeed and let downstream tasks run.
Every sensor inherits from BaseSensorOperator and implements a poke() method. The sensor keeps calling poke() until it returns True (condition met) or hits a timeout.
Runs repeatedly. Return True when the condition is met; False to try again later.
Max seconds to wait. After this, the sensor fails (unless soft_fail=True).
Seconds between each poke. Default is 60. Lower = more frequent checks, more load.
If True, timeout results in "skipped" instead of "failed". Useful for optional waits.
while not poke(): sleep(poke_interval); if elapsed > timeout: fail
Occupies a worker slot while waiting. The task sits in "running" state and keeps the worker busy checking every poke_interval. Bad for long waits — you burn worker capacity!
Releases the slot and reschedules itself for later. Worker is free to run other tasks. When poke_interval elapses, the sensor runs again. Better for long waits!
Set mode="reschedule" on your sensor to use reschedule mode. Use mode="poke" (default) only when waits are short (seconds to a few minutes).
Poke: Short waits (< 5 min), need fast reaction. Reschedule: Long waits (hours), limited workers, many concurrent sensors.
Waits for a file (or directory) to appear. Use when an upstream process drops files you need to process.
*.csv to match multiple filesWait for /data/daily_report_{{ ds }}.csv before running your analytics DAG. The FileSensor checks every minute until the file appears.
Waits for a task in another DAG to finish. Perfect for cross-DAG dependencies (e.g. "run my DAG only after the ingestion DAG has loaded today's data").
None for whole DAG)Your DAG run and the external DAG run must align. Use execution_delta (e.g. timedelta(hours=1)) if they run at different times. This is tricky — read the docs!
Waits for an HTTP endpoint to return success (e.g. 200). Use when an external API or service must be ready before you proceed.
/health)Like calling a friend to see if they're home before you drive over. You keep calling until they pick up (success)!
Runs a SQL query and succeeds when it returns at least one row. Use to wait for data to arrive in a table (e.g. "wait until today's batch appears").
Your query should return a row when the condition is met. Example: SELECT 1 FROM events WHERE date = '{{ ds }}' LIMIT 1
Waits until a specific datetime. Useful for "run this task at exactly 9:00 AM" or "wait until market opens."
Create your own by extending BaseSensorOperator and implementing poke(self, context). Return True when done, False to retry.
from airflow.sensors.base import BaseSensorOperator class MyCustomSensor(BaseSensorOperator): def poke(self, context): # Your logic here return some_condition_met # True or False
Triggers another DAG from within your DAG. Pass conf to send configuration (e.g. date, parameters). The triggered DAG runs asynchronously.
DAG A finishes loading data. It triggers DAG B with conf={"date": "{{ ds }}"}. DAG B can read this via dag_run.conf.
Trigger DAGs without the UI:
airflow dags trigger my_dagPOST /api/v1/dags/{dag_id}/dagRuns with optional confAsync sensors that don't use workers at all while waiting. They defer to a "triggerer" process and wake up when ready. Best of both worlds: no worker slots, no polling!
Use TimeSensorAsync, FileSensorAsync, etc. when available. They're more efficient than classic poke/reschedule sensors.
See how sensors behave differently: Poke holds a worker slot; Reschedule releases it and tries again later.
Poke: Sensor keeps worker busy. Reschedule: Sensor releases slot, reschedules itself.
From start to success: the sensor loops until the condition is met or timeout.
Weather and Sales branches run in parallel, then join together
If "Fetch Weather" takes 5 minutes and "Fetch Sales" takes 3 minutes, running them in parallel means the total is only 5 minutes (not 8). DAGs automatically detect which tasks can run in parallel!
Watch how data flows through Airflow's components:
How Airflow Processes a DAG
Airflow comes with a stunning web interface. Here's what you'll see:
The home page. Shows all your DAGs with their status (running, paused, failed), schedule, owner, and recent task statuses as colored circles.
Shows the structure of a DAG as a visual graph — tasks as boxes and dependencies as arrows. Color-coded by task state (green = success, red = failed, yellow = running).
Shows the history of all runs over time. Each column is a DAG run, each row is a task. This is the most powerful view for debugging — you can see patterns like "this task fails every Monday."
Shows the actual Python code of your DAG file directly in the browser. Great for quick debugging without opening your IDE.
Created Airflow! Uses it for search ranking, pricing, and host analytics pipelines.
Runs 20,000+ DAGs for recommendation engines and content analytics.
Orchestrates ML model training, pricing calculations, and driver analytics.
Manages data pipelines that power content recommendations for 200M+ users.
from airflow import DAG from airflow.sensors.filesystem import FileSensor from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="file_sensor_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False, ) as dag: wait_for_file = FileSensor( task_id="wait_for_file", filepath="/data/reports/{{ ds }}.csv", fs_conn_id="fs_default", poke_interval=60, timeout=3600, mode="reschedule", ) process = BashOperator( task_id="process_file", bash_command="echo 'Processing file...'", ) wait_for_file >> process
from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor from airflow.operators.bash import BashOperator from datetime import datetime, timedelta with DAG( dag_id="downstream_dag", start_date=datetime(2024, 1, 1), schedule="0 2 * * *", # 2 AM daily catchup=False, ) as dag: wait_for_ingestion = ExternalTaskSensor( task_id="wait_for_ingestion", external_dag_id="ingestion_dag", external_task_id="load_complete", execution_delta=timedelta(hours=1), timeout=7200, ) run_analytics = BashOperator( task_id="run_analytics", bash_command="echo 'Running analytics...'", ) wait_for_ingestion >> run_analytics
from airflow import DAG from airflow.providers.http.sensors.http import HttpSensor from airflow.operators.bash import BashOperator from datetime import datetime def check_response(response): return response.status_code == 200 and "ready" in response.json().get("status", "") with DAG( dag_id="http_sensor_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False, ) as dag: wait_for_api = HttpSensor( task_id="wait_for_api", http_conn_id="my_api", endpoint="/health", response_check=check_response, poke_interval=30, ) fetch_data = BashOperator( task_id="fetch_data", bash_command="echo 'API ready, fetching...'", ) wait_for_api >> fetch_data
from airflow import DAG from airflow.providers.common.sql.sensors.sql import SqlSensor from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="sql_sensor_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False, ) as dag: wait_for_data = SqlSensor( task_id="wait_for_data", conn_id="postgres_default", sql="SELECT 1 FROM events WHERE date = '{{ ds }}' LIMIT 1", poke_interval=60, ) process = BashOperator( task_id="process", bash_command="echo 'Data arrived!'", ) wait_for_data >> process
from airflow import DAG from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="parent_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False, ) as dag: prepare = BashOperator( task_id="prepare", bash_command="echo 'Data prepared'", ) trigger_child = TriggerDagRunOperator( task_id="trigger_child", trigger_dag_id="child_dag", conf={"date": "{{ ds }}", "source": "parent"}, ) prepare >> trigger_child
Try these sensor-focused exercises. Don't peek at the answers until you've tried!
Your DAG needs to wait until a daily CSV file sales_{{ ds }}.csv appears in /data/ before processing. Which sensor do you use?
Question: Name the sensor and list the key parameters you'd set.
FileSensor. Key parameters: filepath="/data/sales_{{ ds }}.csv", fs_conn_id (if using remote FS), optionally poke_interval and timeout. Use mode="reschedule" if the wait could be long.
You have 5 FileSensors in a DAG, each waiting potentially 2+ hours for files from different upstream systems. Your Airflow cluster has only 4 workers.
Question: What problem will you hit, and how do you fix it?
Problem: In poke mode, each sensor occupies a worker slot. 5 sensors and 4 workers = one sensor will be queued, and you'll block other DAGs.
Fix: Set mode="reschedule" on all sensors. They'll release the slot between pokes and reschedule themselves. Workers stay free for other tasks.
Your analytics_dag must run only after ingestion_dag's task load_to_warehouse has succeeded for the same logical date.
Question: Write the ExternalTaskSensor configuration (pseudocode or key args).
# Solution: ExternalTaskSensor( task_id="wait_for_ingestion", external_dag_id="ingestion_dag", external_task_id="load_to_warehouse", # Same execution date by default; use execution_delta if schedules differ )
You need to wait until a Redis list has at least 10 items before proceeding. No built-in sensor exists.
Question: Sketch the poke() method for a custom sensor. What should it return when ready? When not ready?
Implement poke(self, context) that: (1) connects to Redis, (2) gets the list length, (3) returns True if length ≥ 10 (condition met, sensor succeeds), (4) returns False otherwise (Airflow will retry after poke_interval).
Test your understanding of Sensors & Triggers! Click on the answer you think is correct.