Write maintainable, secure, and reliable DAGs with style conventions, credentials, idempotency, and more.
Consistent naming and structure make DAGs easier to read, debug, and hand off to teammates.
daily_sales_etl, weekly_report_pipeline.extract_orders, load_to_warehouse.Style conventions are like putting toys in labeled boxes. When everyone uses the same labels and places things in the same spots, anyone can find what they need without a mess!
Never hardcode passwords, API keys, or connection strings in your DAG code. Use Airflow Connections and Variables instead.
Hardcoded secrets end up in Git history and in the Airflow UI (code view). Anyone with repo or UI access can steal them. Connections and Variables keep secrets out of code and can be encrypted.
The scheduler parses your DAG file every few seconds. If that file does slow work (big API calls, heavy imports, large loops), the scheduler gets stuck and all DAGs suffer.
Parse time is like the teacher reading the class schedule. If the schedule had to bake a cake before the teacher could read it, nobody would know when class starts! Keep the schedule quick to read; do the baking (heavy work) when it's actually time for class (when the task runs).
Use TaskGroup to group related tasks (e.g. "extract", "transform", "load") so the Graph View is cleaner and dependencies are easier to manage.
Task Groups show up as a collapsible subgraph in the UI and help you avoid hundreds of tasks in one flat list.
Use Task Groups when you have 5+ related tasks that form a logical unit (e.g. "Fetch from API A", "Fetch from API B", "Merge", "Validate", "Load"). One TaskGroup per logical step keeps the DAG readable.
An operation is idempotent if running it once or multiple times with the same input produces the same result. In Airflow, tasks get retried and DAG runs can be re-triggered; idempotency prevents duplicate or wrong data.
s3://bucket/data/{{ ds }}/) so each run has its own output; re-run overwrites only that run's data.Idempotency is like turning a light switch "on." No matter how many times you flip it to "on," the light is still just on. Flipping "on" again doesn't make it "double on." You want your tasks to be like that — running them again doesn't create a mess.
Where possible, process only new or changed data (e.g. since last run) instead of full reloads. This saves time, cost, and reduces risk of overwriting good data.
Use execution date / data interval in your queries (e.g. WHERE date = '{{ ds }}') and design sources to support "give me data for this interval."
Incremental loading makes pipelines faster and cheaper. Combine it with idempotent writes (e.g. overwrite only that day's partition) for reliable, repeatable runs.
Pools limit how many tasks of a certain type run at the same time. Use them when you have a limited resource (e.g. database connections, API rate limits, or a single external system that can't handle 100 parallel tasks).
Create a pool in the Airflow UI (Admin → Pools), set its size (e.g. 5), and assign that pool to the relevant tasks. Airflow will queue extra tasks until a slot is free.
A pool is like a restaurant with 10 tables. Even if 50 orders come in, only 10 tables can be used at once. The rest wait in line. Pools are your "number of tables" for a given type of task.
An SLA (Service Level Agreement) is a time limit: "This task should finish within 30 minutes of the DAG run starting." If the task misses the SLA, Airflow can send alerts (e.g. email) so you notice delays quickly.
Set sla on the DAG or on individual tasks using a timedelta. SLA misses are visible in the UI and can trigger callbacks.
Set SLAs on critical path tasks (e.g. "daily load must finish by 6am"). Don't set them on every task — focus on the ones that affect downstream users or reports.
Running the same task again with the same inputs should yield the same outcome. Here's how idempotent vs non-idempotent behavior looks:
Idempotent: run 1 and run 2 both produce Output A. Non-idempotent: run 2 adds again and creates duplicates.
Design tasks so that "run again" either overwrites the same logical result (e.g. same partition) or uses upsert/merge so the final state is correct. Avoid blind appends that double data on retry or re-run.
Use Connections for host/login/password and Variables for config. Never hardcode secrets.
# BAD: Hardcoded credentials def bad_task(): conn = psycopg2.connect("host=db.example.com user=etl password=secret123") # GOOD: Use Airflow Connection (by conn_id) from airflow.providers.postgres.hooks.postgres import PostgresHook def good_task(): hook = PostgresHook(postgres_conn_id="warehouse_db") conn = hook.get_conn() # ... use conn # GOOD: Use Variable for config (e.g. bucket name) from airflow.models import Variable bucket = Variable.get("s3_data_bucket") # Inside task, not at DAG parse time!
Keep DAG file lightweight. Do API calls and heavy imports inside the task.
# BAD: Slow at parse time (scheduler reads this every few seconds!) import requests data = requests.get("https://api.example.com/huge").json() # Don't! with DAG(...) as dag: task = PythonOperator(task_id="use_data", python_callable=lambda: process(data)) # GOOD: Fetch inside the task def fetch_and_process(**context): import requests data = requests.get("https://api.example.com/huge").json() return process(data) with DAG(...) as dag: task = PythonOperator(task_id="fetch_and_process", python_callable=fetch_and_process)
Group tasks and write idempotently (e.g. overwrite partition by logical date).
from airflow.models import DAG from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup from datetime import datetime def load_partition(**context): # Idempotent: write to path keyed by ds; re-run overwrites same day only ds = context["ds"] path = f"s3://bucket/data/{ds}/" # ... write data to path (overwrite if exists) with DAG(dag_id="best_practices_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False) as dag: with TaskGroup(group_id="extract") as extract_tg: t1 = PythonOperator(task_id="fetch_api", python_callable=lambda: None) with TaskGroup(group_id="load") as load_tg: t2 = PythonOperator(task_id="load_to_s3", python_callable=load_partition) extract_tg >> load_tg
Set an SLA on the DAG or task and assign a pool to limit concurrency.
from datetime import datetime, timedelta with DAG( dag_id="sla_pool_dag", start_date=datetime(2024, 1, 1), schedule="@daily", sla_miss_callback=my_alert_fn, # Called when SLA is missed ) as dag: task = BashOperator( task_id="critical_load", bash_command="echo 'Loading...'", sla=timedelta(minutes=30), # Must finish within 30 min of DAG run start pool="db_pool", # Only N tasks in this pool run at once )
Try these exercises to apply best practices. Use the hints if you're stuck!
Someone wrote a DAG that does this at the top level (outside any task):
config = requests.get("https://api.example.com/config").json()
Question: Why is this a problem, and where should this call go instead?
The scheduler reads your DAG file over and over, like a teacher checking the schedule. If the schedule had to call a slow phone number every time, the teacher would be stuck and couldn't check other classes. Keep the "schedule" quick; do the phone call when it's actually time for class (inside the task).
It runs at parse time every time the scheduler reads the file, so it slows down the scheduler and hits the API constantly. Move the requests.get(...) inside a PythonOperator callable (or equivalent) so it runs only when the task executes.
A "load" task appends rows to a table every time it runs. When the DAG is re-triggered or the task retries, duplicate rows appear. What's a better approach?
Imagine putting toys in a box. If you "add" every time without checking, you get duplicates. Instead, you could "put today's toys in the box and replace today's slot" so running again doesn't add more.
Use upsert (insert or update by key) or replace partition (e.g. overwrite data for the logical date {{ ds }}) instead of plain append. That way, re-running or retrying produces the same final state without duplicates.
Your DAG needs a database password and an S3 bucket name. Where should each go: in the code, in a Connection, or in a Variable?
Secrets (like passwords) go in a locked drawer (Connections). Labels (like "which bucket") can go in a sticky note (Variables). Never write the actual password on the whiteboard (in code)!
Database password: Store in an Airflow Connection (e.g. Postgres connection with host, user, password). Reference by conn_id in the operator.
S3 bucket name: Store in a Variable (e.g. Variable.get("s3_bucket")) or in a Connection extra if it's part of S3 connection config. Never hardcode either in the DAG file.
You have 50 DAGs that each run a task that talks to the same external API. The API allows only 5 concurrent requests. What Airflow feature do you use, and how?
If 50 people want to use 5 phones, you let 5 use them at a time and the rest wait in line. Pools are Airflow's way of "only 5 at a time" for a type of task.
Use a Pool. Create a pool in Admin → Pools with name e.g. api_pool and size 5. Assign pool="api_pool" to every task that calls that API. Airflow will run only 5 of those tasks at a time; the rest stay queued until a slot is free.
Test your understanding! Click the answer you think is correct.