MODULE 13 OF 15

Best Practices

Write maintainable, secure, and reliable DAGs with style conventions, credentials, idempotency, and more.

Style Conventions

Consistent naming and structure make DAGs easier to read, debug, and hand off to teammates.

Naming & Layout

  • DAG IDs: Use lowercase with underscores, e.g. daily_sales_etl, weekly_report_pipeline.
  • Task IDs: Short, descriptive, lowercase with underscores: extract_orders, load_to_warehouse.
  • One DAG per file: Keep one DAG (or one logical group) per Python file so the scheduler parses quickly and the UI stays clear.
  • Docstrings: Add a short description at the top of the DAG and for complex tasks so others know what the pipeline does.

Explain Like I'm 5

Style conventions are like putting toys in labeled boxes. When everyone uses the same labels and places things in the same spots, anyone can find what they need without a mess!

Credentials & Secrets

Never hardcode passwords, API keys, or connection strings in your DAG code. Use Airflow Connections and Variables instead.

  • Connections: Store host, port, login, password for databases and APIs. Reference them in tasks by connection ID.
  • Variables: Store non-connection config (e.g. bucket names, feature flags). Use the Variable API or templating.
  • Secret backends: In production, use AWS Secrets Manager, HashiCorp Vault, or similar and point Airflow to them.

Why It Matters

Hardcoded secrets end up in Git history and in the Airflow UI (code view). Anyone with repo or UI access can steal them. Connections and Variables keep secrets out of code and can be encrypted.

No Heavy Computation at Parse Time

The scheduler parses your DAG file every few seconds. If that file does slow work (big API calls, heavy imports, large loops), the scheduler gets stuck and all DAGs suffer.

  • Keep the top level of the DAG file fast: define the DAG, create tasks, set dependencies. No database queries or HTTP calls at import time.
  • Do heavy work inside the task (e.g. in a PythonOperator callable or in the operator's execute method). That code runs only when the task runs.
  • Lazy-load heavy libraries inside the task if you don't need them for defining the DAG.

Explain Like I'm 5

Parse time is like the teacher reading the class schedule. If the schedule had to bake a cake before the teacher could read it, nobody would know when class starts! Keep the schedule quick to read; do the baking (heavy work) when it's actually time for class (when the task runs).

Task Groups

Use TaskGroup to group related tasks (e.g. "extract", "transform", "load") so the Graph View is cleaner and dependencies are easier to manage.

Task Groups show up as a collapsible subgraph in the UI and help you avoid hundreds of tasks in one flat list.

When to Use

Use Task Groups when you have 5+ related tasks that form a logical unit (e.g. "Fetch from API A", "Fetch from API B", "Merge", "Validate", "Load"). One TaskGroup per logical step keeps the DAG readable.

Idempotency

An operation is idempotent if running it once or multiple times with the same input produces the same result. In Airflow, tasks get retried and DAG runs can be re-triggered; idempotency prevents duplicate or wrong data.

  • Load tasks: Use "upsert" or "replace partition" instead of blind append, so re-running doesn't duplicate rows.
  • File tasks: Write to a path that includes the logical date (e.g. s3://bucket/data/{{ ds }}/) so each run has its own output; re-run overwrites only that run's data.
  • API calls: If the API is idempotent (e.g. "set value to X"), re-running is safe. If not, use idempotency keys or check-before-write logic inside the task.

Explain Like I'm 5

Idempotency is like turning a light switch "on." No matter how many times you flip it to "on," the light is still just on. Flipping "on" again doesn't make it "double on." You want your tasks to be like that — running them again doesn't create a mess.

Incremental Loading

Where possible, process only new or changed data (e.g. since last run) instead of full reloads. This saves time, cost, and reduces risk of overwriting good data.

Use execution date / data interval in your queries (e.g. WHERE date = '{{ ds }}') and design sources to support "give me data for this interval."

Benefit

Incremental loading makes pipelines faster and cheaper. Combine it with idempotent writes (e.g. overwrite only that day's partition) for reliable, repeatable runs.

Pools

Pools limit how many tasks of a certain type run at the same time. Use them when you have a limited resource (e.g. database connections, API rate limits, or a single external system that can't handle 100 parallel tasks).

Create a pool in the Airflow UI (Admin → Pools), set its size (e.g. 5), and assign that pool to the relevant tasks. Airflow will queue extra tasks until a slot is free.

Analogy

A pool is like a restaurant with 10 tables. Even if 50 orders come in, only 10 tables can be used at once. The rest wait in line. Pools are your "number of tables" for a given type of task.

SLAs

An SLA (Service Level Agreement) is a time limit: "This task should finish within 30 minutes of the DAG run starting." If the task misses the SLA, Airflow can send alerts (e.g. email) so you notice delays quickly.

Set sla on the DAG or on individual tasks using a timedelta. SLA misses are visible in the UI and can trigger callbacks.

Best Practice

Set SLAs on critical path tasks (e.g. "daily load must finish by 6am"). Don't set them on every task — focus on the ones that affect downstream users or reports.

Idempotency Diagram

Running the same task again with the same inputs should yield the same outcome. Here's how idempotent vs non-idempotent behavior looks:

Idempotent (good) Same input Task run 1 Output A Task run 2 Output A Same result ✓ Non-idempotent (bad) Same input Task run 1 Output A Task run 2 A + A Duplicates ✗

Idempotent: run 1 and run 2 both produce Output A. Non-idempotent: run 2 adds again and creates duplicates.

Takeaway

Design tasks so that "run again" either overwrites the same logical result (e.g. same partition) or uses upsert/merge so the final state is correct. Avoid blind appends that double data on retry or re-run.

Credentials: Connections & Variables

Use Connections for host/login/password and Variables for config. Never hardcode secrets.

# BAD: Hardcoded credentials
def bad_task():
    conn = psycopg2.connect("host=db.example.com user=etl password=secret123")

# GOOD: Use Airflow Connection (by conn_id)
from airflow.providers.postgres.hooks.postgres import PostgresHook

def good_task():
    hook = PostgresHook(postgres_conn_id="warehouse_db")
    conn = hook.get_conn()
    # ... use conn

# GOOD: Use Variable for config (e.g. bucket name)
from airflow.models import Variable
bucket = Variable.get("s3_data_bucket")  # Inside task, not at DAG parse time!

No Heavy Work at Parse Time

Keep DAG file lightweight. Do API calls and heavy imports inside the task.

# BAD: Slow at parse time (scheduler reads this every few seconds!)
import requests
data = requests.get("https://api.example.com/huge").json()  # Don't!

with DAG(...) as dag:
    task = PythonOperator(task_id="use_data", python_callable=lambda: process(data))

# GOOD: Fetch inside the task
def fetch_and_process(**context):
    import requests
    data = requests.get("https://api.example.com/huge").json()
    return process(data)

with DAG(...) as dag:
    task = PythonOperator(task_id="fetch_and_process", python_callable=fetch_and_process)

TaskGroup & Idempotent Load

Group tasks and write idempotently (e.g. overwrite partition by logical date).

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime

def load_partition(**context):
    # Idempotent: write to path keyed by ds; re-run overwrites same day only
    ds = context["ds"]
    path = f"s3://bucket/data/{ds}/"
    # ... write data to path (overwrite if exists)

with DAG(dag_id="best_practices_dag", start_date=datetime(2024, 1, 1), schedule="@daily", catchup=False) as dag:
    with TaskGroup(group_id="extract") as extract_tg:
        t1 = PythonOperator(task_id="fetch_api", python_callable=lambda: None)
    with TaskGroup(group_id="load") as load_tg:
        t2 = PythonOperator(task_id="load_to_s3", python_callable=load_partition)
    extract_tg >> load_tg

SLA & Pool

Set an SLA on the DAG or task and assign a pool to limit concurrency.

from datetime import datetime, timedelta

with DAG(
    dag_id="sla_pool_dag",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    sla_miss_callback=my_alert_fn,   # Called when SLA is missed
) as dag:
    task = BashOperator(
        task_id="critical_load",
        bash_command="echo 'Loading...'",
        sla=timedelta(minutes=30),  # Must finish within 30 min of DAG run start
        pool="db_pool",                 # Only N tasks in this pool run at once
    )

Practice Exercises

Try these exercises to apply best practices. Use the hints if you're stuck!

Exercise 1: Spot the Parse-Time Mistake

Scenario

Someone wrote a DAG that does this at the top level (outside any task):

config = requests.get("https://api.example.com/config").json()

Question: Why is this a problem, and where should this call go instead?

ELI5 Hint

The scheduler reads your DAG file over and over, like a teacher checking the schedule. If the schedule had to call a slow phone number every time, the teacher would be stuck and couldn't check other classes. Keep the "schedule" quick; do the phone call when it's actually time for class (inside the task).

Answer

1

It runs at parse time every time the scheduler reads the file, so it slows down the scheduler and hits the API constantly. Move the requests.get(...) inside a PythonOperator callable (or equivalent) so it runs only when the task executes.

Exercise 2: Make It Idempotent

Scenario

A "load" task appends rows to a table every time it runs. When the DAG is re-triggered or the task retries, duplicate rows appear. What's a better approach?

ELI5 Hint

Imagine putting toys in a box. If you "add" every time without checking, you get duplicates. Instead, you could "put today's toys in the box and replace today's slot" so running again doesn't add more.

Answer

2

Use upsert (insert or update by key) or replace partition (e.g. overwrite data for the logical date {{ ds }}) instead of plain append. That way, re-running or retrying produces the same final state without duplicates.

Exercise 3: Where to Put Secrets?

Scenario

Your DAG needs a database password and an S3 bucket name. Where should each go: in the code, in a Connection, or in a Variable?

ELI5 Hint

Secrets (like passwords) go in a locked drawer (Connections). Labels (like "which bucket") can go in a sticky note (Variables). Never write the actual password on the whiteboard (in code)!

Answer

3

Database password: Store in an Airflow Connection (e.g. Postgres connection with host, user, password). Reference by conn_id in the operator.
S3 bucket name: Store in a Variable (e.g. Variable.get("s3_bucket")) or in a Connection extra if it's part of S3 connection config. Never hardcode either in the DAG file.

Exercise 4: When to Use a Pool?

Scenario

You have 50 DAGs that each run a task that talks to the same external API. The API allows only 5 concurrent requests. What Airflow feature do you use, and how?

ELI5 Hint

If 50 people want to use 5 phones, you let 5 use them at a time and the rest wait in line. Pools are Airflow's way of "only 5 at a time" for a type of task.

Answer

4

Use a Pool. Create a pool in Admin → Pools with name e.g. api_pool and size 5. Assign pool="api_pool" to every task that calls that API. Airflow will run only 5 of those tasks at a time; the rest stay queued until a slot is free.

Module 13 Quiz

Test your understanding! Click the answer you think is correct.

1. Where should you store a database password in Airflow?

2. Why should you avoid heavy computation at DAG parse time?

3. What does "idempotent" mean for a task?

4. TaskGroup is used to ___?

5. Incremental loading means ___?

6. What is an Airflow Pool used for?

7. An SLA (Service Level Agreement) in Airflow ___?

8. Which is a good style convention for DAG and task IDs?