MODULE 11 OF 15

Testing Airflow

Ship with confidence using DAG integrity tests and unit tests!

Why Test?

Pipelines run in production every day. A broken DAG can block reports, break dashboards, or corrupt data. Testing catches mistakes before they reach production so you can ship with confidence.

Explain Like I'm 5

Imagine you're building a toy train track. Before you show it to your friends, you run the train once to make sure no pieces are missing and the train doesn't fall off. Testing your DAGs is the same idea — you check that everything runs correctly before you let it run for real in production!

Two main kinds of tests matter for Airflow:

  • DAG integrity tests — Does the DAG load? Are there cycles? Do task IDs match?
  • Unit tests — Does each task do what it's supposed to do? (Often with mocked external systems.)

DAG Integrity Tests

Before any task runs, Airflow must be able to parse and load your DAG file. DAG integrity tests check that:

  • The file has no syntax errors and the DAG object is created.
  • There are no cycles (the graph is acyclic).
  • Task IDs are unique and dependencies are valid.
  • Required arguments (e.g. start_date) are set.

airflow dags test

Airflow provides a built-in way to test that a DAG loads correctly:

Command

airflow dags test <dag_id> <execution_date> runs the DAG once for that date without recording state in the metadata database. It's great for a quick sanity check that the DAG runs end-to-end.

For CI (continuous integration), you typically want to parse all DAGs in your repo and assert no import errors. That way every pull request is checked before merge.

Unit Testing Tasks

Unit tests focus on one task (or one piece of logic) in isolation. You call the task's callable or operator with fake or controlled inputs and assert on the output or side effects.

Explain Like I'm 5

Instead of really calling the weather API (which might be slow or cost money), you pretend to be the API and return a fixed answer. Then you check that your task does the right thing with that answer. That's unit testing!

With pytest, you can test PythonOperator callables, custom operators' execute() methods, and helper functions. Use Airflow's test utilities (e.g. creating a mock context) so tasks run in a test environment.

Mocking

Tasks often talk to databases, APIs, or cloud services. In unit tests you usually mock those so tests are fast, free, and don't depend on the network.

Python's unittest.mock (or pytest with plugins) lets you replace a real connection or HTTP call with a fake that returns fixed data. You then assert that your task logic handles that data correctly.

Rule of Thumb

Mock external systems in unit tests. Use integration tests or staging only when you need to verify real connections.

CI/CD

Continuous Integration / Continuous Deployment means every time you push code, automated steps run: lint, tests, and often deploy to a test or production environment.

For Airflow, a typical CI pipeline:

  • Check out the repo and set up a Python environment.
  • Install Airflow and dependencies.
  • Run DAG integrity tests (parse all DAGs, ensure no errors).
  • Run unit tests (pytest).
  • Optionally deploy DAGs to a test/staging Airflow instance.

If any step fails, the pipeline stops and the code doesn't get deployed. That keeps production safe.

DTAP

DTAP stands for Development, Test, Acceptance, Production. It's a way to separate environments so you never test on production data or break real users.

Development

Where you write and run DAGs locally. Fast feedback, no impact on others.

Test

Automated tests run here (CI). Often uses mocked or small datasets.

Acceptance

Staging: run full pipelines with real-ish data to validate before production.

Production

Live system. Only deploy after tests and acceptance pass.

Explain Like I'm 5

DTAP is like having a practice kitchen (Development), a taste test (Test), a soft opening for friends (Acceptance), and then the real restaurant open to everyone (Production). You never serve customers a dish you haven't tried first!

CI/CD Flow

Code flows through automated checks before it reaches production. Here's a simple CI/CD pipeline for Airflow DAGs:

CI/CD Pipeline for Airflow


Git Push

Checkout

Lint

DAG Parse

Unit Tests

Deploy

Each step must pass before the next runs. If tests fail, deployment stops.

DTAP Diagram

Environments from left to right: code moves through Development → Test → Acceptance → Production.

Development Test (CI) Acceptance Production Local / dev Automated tests Staging Live

DTAP: Development → Test → Acceptance → Production

Key Idea

Never skip Test or Acceptance when shipping to Production. CI runs tests automatically; Acceptance is where you validate with real workflows before going live.

Pytest Examples

Use pytest to unit-test your task logic. Example: testing a simple Python callable used in a PythonOperator.

# tests/test_tasks.py
import pytest
from unittest.mock import MagicMock, patch
from my_dag import transform_data

def test_transform_data_returns_list():
    # Arrange: fake input
    raw = [{"id": 1, "value": 10}, {"id": 2, "value": 20}]
    # Act
    result = transform_data(raw)
    # Assert
    assert isinstance(result, list)
    assert len(result) == 2
    assert result[0]["value"] == 10

@patch("my_dag.requests.get")
def test_fetch_data_mocks_http(mock_get):
    mock_get.return_value.json.return_value = {"data": [1, 2, 3]}
    from my_dag import fetch_data
    result = fetch_data()
    assert result == [1, 2, 3]
    mock_get.assert_called_once()

What This Does

test_transform_data_returns_list calls your transform_data function with fake data and checks the result shape and values. No Airflow, no database — just the logic.

test_fetch_data_mocks_http uses @patch to replace the real requests.get with a mock that returns fixed JSON. So the test doesn't hit the real API!

DAG Integrity Test

In CI, parse all DAGs in your dags/ folder and ensure none raise errors. Example with pytest:

# tests/test_dag_integrity.py
import os
import pytest
from airflow.models import DagBag

def test_dags_load_with_no_errors():
    dag_bag = DagBag(dag_folder=os.path.join(os.path.dirname(__file__), "..", "dags"), include_examples=False)
    assert len(dag_bag.import_errors) == 0, \
        f"DAG import errors: {dag_bag.import_errors}"

def test_dags_have_tags():
    dag_bag = DagBag(dag_folder=os.path.join(os.path.dirname(__file__), "..", "dags"), include_examples=False)
    for dag_id, dag in dag_bag.dags.items():
        assert len(dag.tags) >= 0  # optional: require tags

Pro Tip

Run pytest tests/ in your CI job. Add test_dags_load_with_no_errors so every DAG in dags/ is parsed. If someone pushes a broken DAG, the build fails and it never reaches production.

Practice Exercises

Try these to solidify testing concepts. Use ELI5 thinking: keep it simple!

Exercise 1: Why Mock?

Scenario

Your task calls a paid weather API. You want to unit-test the logic that converts the API response into a table. Should you call the real API in the test? Why or why not?

Answer

1

No. Use a mock. The real API costs money, can be slow, and might be down. In a unit test you only care whether your conversion logic is correct, so you fake the API response and assert on the output. Fast, free, reliable.

Exercise 2: What Does airflow dags test Do?

Question

In one sentence, what is the main purpose of airflow dags test my_dag 2024-01-01?

Answer

2

It runs the DAG my_dag once for the given execution date without saving the run to the metadata database, so you can quickly verify the DAG runs end-to-end locally.

Exercise 3: Order the DTAP Environments

Challenge

Put these in the order code typically flows: Production, Test, Development, Acceptance.

Answer

3

Development → Test → Acceptance → Production. You develop first, then automated tests run (Test), then you validate in staging (Acceptance), and only then deploy to Production.

Exercise 4: What Should CI Run Before Deploy?

Scenario

Your team uses GitHub Actions for CI. What two types of checks should run before DAGs are deployed to the staging Airflow instance?

Answer

4

(1) DAG integrity — Parse all DAGs (e.g. with DagBag) and assert no import errors. (2) Unit tests — Run pytest (or similar) for task logic and helpers. Both must pass before deploy.

Module 11 Quiz

Test your understanding! Click the answer you think is correct.

1. What is the main goal of DAG integrity tests?

2. What does airflow dags test <dag_id> <date> do?

3. Why do we mock external systems in unit tests?

4. What does CI/CD stand for?

5. What does the "T" in DTAP stand for?

6. Which tool is commonly used to run unit tests in Python/Airflow projects?

7. In DTAP, which environment should you deploy to last?

8. What does DagBag do in a DAG integrity test?