Build your own hooks, operators, and sensors!
Airflow ships with dozens of built-in operators, hooks, and sensors. But when you need to talk to an internal API, a proprietary database, or a service that doesn't have an Airflow integration yet, you build custom components. They keep your DAGs clean, reusable, and testable.
Think of built-in operators as toys that come in the box. Custom components are toys you build yourself: they do exactly what your pipeline needs, and you can use them again and again in different DAGs instead of copy-pasting the same Python code everywhere.
Your company uses a tool that doesn't have an Airflow provider. You create a Hook to manage the connection and an Operator that uses that hook.
Same "call our internal API" logic in 20 DAGs? Put it in one custom operator and pass parameters. One place to fix bugs and add features.
You need to wait until a file appears, a table has rows, or an API returns "ready." A custom sensor that pokes until the condition is true is the right tool.
Hooks = connection + interface to an external system (get a client, run a query). Operators = a single unit of work (use a hook and do one thing). Sensors = wait until a condition is met (poke until true). Build hooks first, then operators/sensors that use them.
A Hook encapsulates connection logic and provides a simple interface (e.g., get_conn()) that operators and sensors use. You subclass BaseHook and implement get_conn() (and optionally get_connection() to read from Airflow Connections).
All hooks inherit from airflow.hooks.base.BaseHook. You override get_conn() to return whatever your task needs: a database connection, an API client, a session object, etc. Use self.get_connection(conn_id) to get login details from Airflow's Connections store so you never hardcode secrets.
A hook is like a phone charger: it connects your task (phone) to the outside world (outlet). get_conn() is "plug in and get power" — once you have the connection, you can run queries or call APIs. The connection details live in Airflow (Admin → Connections), not in your code.
Hooks should be stateless and idempotent: creating a connection or running a read should not change external state. Save "do something" logic for operators.
An Operator represents one task in your DAG. You subclass BaseOperator, define templatable fields (e.g., sql, path) and implement execute(context). Inside execute you typically get a hook with self.hook (or instantiate your custom hook) and perform the work.
BaseOperator gives you everything Airflow needs: task_id, retries, retry_delay, templates, etc. You add your own constructor arguments (e.g., conn_id, query) and put the real logic in execute(self, context). Return value is optional; to pass data to downstream tasks, push to XCom.
Pass through **kwargs to BaseOperator so task_id, retries, and templates work.
Do one logical thing: run a query, call an API, write a file. Use your hook for connections.
Add attribute names to template_fields so Airflow can render Jinja (e.g., {{ ds }}) in those fields.
Sensors are operators that wait until a condition is true. You subclass BaseSensorOperator and implement poke(self, context). The sensor runs poke() repeatedly (every poke_interval seconds) until it returns True or the task times out.
Instead of execute, you implement poke(context). Return True when the condition is met (e.g., file exists, table has rows); return False to sleep and try again. Set poke_interval and timeout so you don't poll too aggressively or wait forever.
A sensor is like checking the mailbox every few minutes: "Is the package here?" If no, go back to sleep. If yes, you're done! poke is that "check once" action; Airflow keeps calling it until you say "yes" or time runs out.
For a single DAG, you can put custom hooks/operators/sensors in a plugins/ folder or next to your DAG file. For reuse across many DAGs or teams, package them as a Python package (e.g., my_airflow_plugins) and install it in the Airflow environment. Then from my_airflow_plugins.operators import MyOperator in any DAG.
Airflow auto-loads modules under airflow.plugins_manager. Put Python files or packages in plugins/ and register hooks/operators in a plugin class. Simple and built-in.
Build a wheel/sdist, add to pip install in your Docker image or venv. Best for multiple repos or versioned, shared components.
poke_interval and timeout wisely.Start with a hook that wraps your external system. Then build one operator and one sensor that use it. Once that works, expand to more operators or share the package with other teams.
Custom components sit on top of Airflow's base classes. Operators and sensors use hooks for connections; sensors add "wait until" behavior.
Base classes → custom implementations. Operators and sensors use the hook for connections.
How custom components fit into your project or a shared package:
One package (or plugins folder) holds hooks, operators, sensors; DAGs import and instantiate them.
A hook that returns a simple API client (e.g., for an internal service). Connection details come from Airflow Connections.
from airflow.hooks.base import BaseHook class MyApiHook(BaseHook): conn_name_attr = "conn_id" default_conn_name = "my_api_default" conn_type = "generic" def __init__(self, conn_id: str = default_conn_name, *args, **kwargs): super().__init__(*args, **kwargs) self.conn_id = conn_id def get_conn(self): conn = self.get_connection(self.conn_id) # Build your client from conn.host, conn.login, conn.password, conn.extra_dejson api_client = SomeApiClient( base_url=conn.host, api_key=conn.password, **conn.extra_dejson ) return api_client
An operator that uses the hook and runs one logical action (e.g., trigger a sync).
from airflow.models import BaseOperator from .hooks.my_api_hook import MyApiHook class MyApiSyncOperator(BaseOperator): template_fields = ("job_name",) def __init__(self, conn_id: str, job_name: str, **kwargs): super().__init__(**kwargs) self.conn_id = conn_id self.job_name = job_name def execute(self, context): hook = MyApiHook(conn_id=self.conn_id) client = hook.get_conn() result = client.trigger_sync(self.job_name) return result.get("run_id") # optional: push to XCom
A sensor that pokes until a file exists (or any condition you implement).
from airflow.sensors.base import BaseSensorOperator class MyFileSensor(BaseSensorOperator): template_fields = ("path",) def __init__(self, path: str, poke_interval: int = 60, timeout: int = 3600, **kwargs): super().__init__(poke_interval=poke_interval, timeout=timeout, **kwargs) self.path = path def poke(self, context): import os return os.path.isfile(self.path)
Import your hook, operator, and sensor; instantiate them with the right conn_id or path; set dependencies with >>. Ensure the plugin package or plugins/ path is on Airflow's Python path.
Use the ELI5 and step boxes to reason through each exercise. Try writing code on paper or in a file before peeking at the answers.
You need to run a query on a PostgreSQL database in three different DAGs. Should you put the "connect to Postgres" code in the operator or in a hook? Why?
Put it in a hook. Connection logic (get host, password, open connection) belongs in a hook so it can be reused by many operators and sensors. The operator should call hook.get_conn() and then run the query. That way you reuse one place for credentials and connection handling.
You need to wait until a table has at least 100 rows before continuing the pipeline. Should you use a custom Operator or a custom Sensor? What method do you implement?
Use a Sensor and implement poke(context). In poke, run a SELECT COUNT(*) (via a hook), return True if count ≥ 100, else False. Airflow will call poke every poke_interval until it returns True or the task times out.
You have a custom operator with a parameter output_path that should support Jinja, e.g. output_path="/data/{{ ds }}/result.csv". What do you need to add to your operator class?
# Add template_fields to your operator class: class MyOperator(BaseOperator): template_fields = ("output_path",) # Airflow will render Jinja in this field def __init__(self, output_path: str, **kwargs): super().__init__(**kwargs) self.output_path = output_path
Your DAG should: (1) call API to start a job, (2) wait until the job is done, (3) download the result file. How many custom components (hooks/operators/sensors) would you ideally create, and what would each do?
API hook with get_conn() returning the API client. Used by both the "start job" operator and the "wait until done" sensor.
"Start job" operator: in execute, get hook, call client.start_job(...), push job_id to XCom for the sensor.
"Job done" sensor: in poke, get job_id from XCom (or pass as templated param), call API to check status, return True when done.
Either another operator (same hook, client.download_result(...)) or a built-in operator like HttpOperator / custom "download file" operator. So: 1 hook, 2 operators, 1 sensor (or 2 operators if you combine "download" into one).
Test your understanding! Click the answer you think is correct.