MODULE 10 OF 15

Custom Components

Build your own hooks, operators, and sensors!

Why Custom Components?

Airflow ships with dozens of built-in operators, hooks, and sensors. But when you need to talk to an internal API, a proprietary database, or a service that doesn't have an Airflow integration yet, you build custom components. They keep your DAGs clean, reusable, and testable.

Explain Like I'm 5

Think of built-in operators as toys that come in the box. Custom components are toys you build yourself: they do exactly what your pipeline needs, and you can use them again and again in different DAGs instead of copy-pasting the same Python code everywhere.

When to Build Custom Components

1
No existing integration

Your company uses a tool that doesn't have an Airflow provider. You create a Hook to manage the connection and an Operator that uses that hook.

2
Reuse across many DAGs

Same "call our internal API" logic in 20 DAGs? Put it in one custom operator and pass parameters. One place to fix bugs and add features.

3
Wait for a condition

You need to wait until a file appears, a table has rows, or an API returns "ready." A custom sensor that pokes until the condition is true is the right tool.

The Big Three

Hooks = connection + interface to an external system (get a client, run a query). Operators = a single unit of work (use a hook and do one thing). Sensors = wait until a condition is met (poke until true). Build hooks first, then operators/sensors that use them.

Custom Hooks

A Hook encapsulates connection logic and provides a simple interface (e.g., get_conn()) that operators and sensors use. You subclass BaseHook and implement get_conn() (and optionally get_connection() to read from Airflow Connections).

BaseHook and get_conn

All hooks inherit from airflow.hooks.base.BaseHook. You override get_conn() to return whatever your task needs: a database connection, an API client, a session object, etc. Use self.get_connection(conn_id) to get login details from Airflow's Connections store so you never hardcode secrets.

Explain Like I'm 5

A hook is like a phone charger: it connects your task (phone) to the outside world (outlet). get_conn() is "plug in and get power" — once you have the connection, you can run queries or call APIs. The connection details live in Airflow (Admin → Connections), not in your code.

Remember

Hooks should be stateless and idempotent: creating a connection or running a read should not change external state. Save "do something" logic for operators.

Custom Operators

An Operator represents one task in your DAG. You subclass BaseOperator, define templatable fields (e.g., sql, path) and implement execute(context). Inside execute you typically get a hook with self.hook (or instantiate your custom hook) and perform the work.

BaseOperator and execute

BaseOperator gives you everything Airflow needs: task_id, retries, retry_delay, templates, etc. You add your own constructor arguments (e.g., conn_id, query) and put the real logic in execute(self, context). Return value is optional; to pass data to downstream tasks, push to XCom.

Operator Checklist

1
Call super().__init__()

Pass through **kwargs to BaseOperator so task_id, retries, and templates work.

2
Implement execute(context)

Do one logical thing: run a query, call an API, write a file. Use your hook for connections.

3
Use template_fields for parameters

Add attribute names to template_fields so Airflow can render Jinja (e.g., {{ ds }}) in those fields.

Custom Sensors

Sensors are operators that wait until a condition is true. You subclass BaseSensorOperator and implement poke(self, context). The sensor runs poke() repeatedly (every poke_interval seconds) until it returns True or the task times out.

BaseSensorOperator and poke

Instead of execute, you implement poke(context). Return True when the condition is met (e.g., file exists, table has rows); return False to sleep and try again. Set poke_interval and timeout so you don't poll too aggressively or wait forever.

Explain Like I'm 5

A sensor is like checking the mailbox every few minutes: "Is the package here?" If no, go back to sleep. If yes, you're done! poke is that "check once" action; Airflow keeps calling it until you say "yes" or time runs out.

Packaging

For a single DAG, you can put custom hooks/operators/sensors in a plugins/ folder or next to your DAG file. For reuse across many DAGs or teams, package them as a Python package (e.g., my_airflow_plugins) and install it in the Airflow environment. Then from my_airflow_plugins.operators import MyOperator in any DAG.

Where to Put Custom Code

1
plugins/ directory

Airflow auto-loads modules under airflow.plugins_manager. Put Python files or packages in plugins/ and register hooks/operators in a plugin class. Simple and built-in.

2
Installed package

Build a wheel/sdist, add to pip install in your Docker image or venv. Best for multiple repos or versioned, shared components.

Best Practices

  • One logical action per operator. Don't chain five API calls in one operator; split or use a pipeline.
  • Use hooks for I/O. Operators should get connections/clients from hooks, not create them from scratch.
  • Make sensors efficient. Don't poke every second if the condition can only change every hour. Use poke_interval and timeout wisely.
  • Add template_fields for any parameter that should support Jinja (execution_date, ds, etc.).
  • Write unit tests for your hook and operator logic so refactors don't break DAGs.

Pro Tip

Start with a hook that wraps your external system. Then build one operator and one sensor that use it. Once that works, expand to more operators or share the package with other teams.

Hook / Operator / Sensor Hierarchy

Custom components sit on top of Airflow's base classes. Operators and sensors use hooks for connections; sensors add "wait until" behavior.

BaseHook get_conn() MyCustomHook BaseOperator execute(context) BaseSensorOperator poke(context) MyOperator MyFileSensor uses

Base classes → custom implementations. Operators and sensors use the hook for connections.

Packaging Diagram

How custom components fit into your project or a shared package:

plugins/ or my_airflow_plugins hooks/ operators/ sensors/ DAGs from my_airflow_plugins.operators import MyOperator from my_airflow_plugins.hooks import MyHook MyOperator(task_id='run_it', conn_id='my_conn', ...) Package provides components; DAGs import and use them

One package (or plugins folder) holds hooks, operators, sensors; DAGs import and instantiate them.

Custom Hook Example

A hook that returns a simple API client (e.g., for an internal service). Connection details come from Airflow Connections.

from airflow.hooks.base import BaseHook

class MyApiHook(BaseHook):
    conn_name_attr = "conn_id"
    default_conn_name = "my_api_default"
    conn_type = "generic"

    def __init__(self, conn_id: str = default_conn_name, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id

    def get_conn(self):
        conn = self.get_connection(self.conn_id)
        # Build your client from conn.host, conn.login, conn.password, conn.extra_dejson
        api_client = SomeApiClient(
            base_url=conn.host,
            api_key=conn.password,
            **conn.extra_dejson
        )
        return api_client

Custom Operator Example

An operator that uses the hook and runs one logical action (e.g., trigger a sync).

from airflow.models import BaseOperator
from .hooks.my_api_hook import MyApiHook

class MyApiSyncOperator(BaseOperator):
    template_fields = ("job_name",)

    def __init__(self, conn_id: str, job_name: str, **kwargs):
        super().__init__(**kwargs)
        self.conn_id = conn_id
        self.job_name = job_name

    def execute(self, context):
        hook = MyApiHook(conn_id=self.conn_id)
        client = hook.get_conn()
        result = client.trigger_sync(self.job_name)
        return result.get("run_id")  # optional: push to XCom

Custom Sensor Example

A sensor that pokes until a file exists (or any condition you implement).

from airflow.sensors.base import BaseSensorOperator

class MyFileSensor(BaseSensorOperator):
    template_fields = ("path",)

    def __init__(self, path: str, poke_interval: int = 60, timeout: int = 3600, **kwargs):
        super().__init__(poke_interval=poke_interval, timeout=timeout, **kwargs)
        self.path = path

    def poke(self, context):
        import os
        return os.path.isfile(self.path)

Usage in a DAG

Import your hook, operator, and sensor; instantiate them with the right conn_id or path; set dependencies with >>. Ensure the plugin package or plugins/ path is on Airflow's Python path.

Practice Exercises

Use the ELI5 and step boxes to reason through each exercise. Try writing code on paper or in a file before peeking at the answers.

Exercise 1: Where does connection logic go?

Scenario

You need to run a query on a PostgreSQL database in three different DAGs. Should you put the "connect to Postgres" code in the operator or in a hook? Why?

Answer

Put it in a hook. Connection logic (get host, password, open connection) belongs in a hook so it can be reused by many operators and sensors. The operator should call hook.get_conn() and then run the query. That way you reuse one place for credentials and connection handling.

Exercise 2: poke vs execute

Scenario

You need to wait until a table has at least 100 rows before continuing the pipeline. Should you use a custom Operator or a custom Sensor? What method do you implement?

Answer

Use a Sensor and implement poke(context). In poke, run a SELECT COUNT(*) (via a hook), return True if count ≥ 100, else False. Airflow will call poke every poke_interval until it returns True or the task times out.

Exercise 3: template_fields

Challenge

You have a custom operator with a parameter output_path that should support Jinja, e.g. output_path="/data/{{ ds }}/result.csv". What do you need to add to your operator class?

# Add template_fields to your operator class:
class MyOperator(BaseOperator):
    template_fields = ("output_path",)   # Airflow will render Jinja in this field

    def __init__(self, output_path: str, **kwargs):
        super().__init__(**kwargs)
        self.output_path = output_path

Exercise 4: One operator, one action

Scenario

Your DAG should: (1) call API to start a job, (2) wait until the job is done, (3) download the result file. How many custom components (hooks/operators/sensors) would you ideally create, and what would each do?

Answer

1
One Hook

API hook with get_conn() returning the API client. Used by both the "start job" operator and the "wait until done" sensor.

2
One Operator

"Start job" operator: in execute, get hook, call client.start_job(...), push job_id to XCom for the sensor.

3
One Sensor

"Job done" sensor: in poke, get job_id from XCom (or pass as templated param), call API to check status, return True when done.

4
Download

Either another operator (same hook, client.download_result(...)) or a built-in operator like HttpOperator / custom "download file" operator. So: 1 hook, 2 operators, 1 sensor (or 2 operators if you combine "download" into one).

Module 10 Quiz

Test your understanding! Click the answer you think is correct.

1. What does a Hook provide that Operators and Sensors use?

2. Which method do you implement in a custom Operator?

3. Which method do you implement in a custom Sensor?

4. What does poke() return when the condition is not yet met?

5. Where should you store connection credentials when building a custom Hook?

6. What is template_fields used for?

7. Custom hooks should be ___?

8. Where can you put custom hooks/operators so Airflow can load them?