Master every major operator type — from Bash scripts to database queries to email alerts!
In Airflow, operators are pre-built "task templates" that do specific jobs. Each operator knows exactly how to perform one type of action — run a bash command, execute a Python function, send an email, or query a database. When you add an operator to your DAG, you're adding a task that will do that specific job.
Operators are like different kitchen appliances. A blender blends. A toaster toasts. A microwave heats. Each appliance does one thing really well. You don't use a blender to make toast!
Operators are the same — BashOperator runs shell commands. PythonOperator runs Python code. EmailOperator sends emails. Each one is perfect for its job. Pick the right "appliance" for what you need to do!
Airflow comes with dozens of operators built-in, and hundreds more are available through "provider packages" from the community. Mastering operators means you can build any pipeline you can imagine.
Operators = task templates. Each operator type performs a specific action. You choose the operator that matches what you want your task to do.
The BashOperator runs shell commands. It's perfect for running scripts, chaining commands with && or ;, and doing anything you'd do in a terminal.
You can run a single command: echo "Hello". You can chain commands: cd /tmp && ls -la. You can run a whole script: bash /path/to/script.sh.
BashOperator(task_id="...", bash_command="...", env={...})
env Parameter for Environment VariablesPass a dictionary to env to set environment variables for your command. Want MY_DATE available in your script? Use env={"MY_DATE": "{{ ds }}"} and Airflow will substitute the logical date.
Output goes to the task log. Use xcom_push=True to push the last line of stdout to XCom so downstream tasks can read it. For complex data, write to a file or database instead!
The PythonOperator runs any Python function. It's the most flexible operator — if you can write it in Python, Airflow can run it.
python_callable is the function to run. Pass positional arguments with op_args=[1, 2, 3] and keyword arguments with op_kwargs={"key": "value"}.
PythonOperator(task_id="...", python_callable=my_func, op_args=[...], op_kwargs={...})
Whatever your function returns gets pushed to XCom automatically. Downstream tasks can pull it with ti.xcom_pull(task_ids='upstream_task').
Add **kwargs to your function signature to receive Airflow's context. You get ti (TaskInstance), ds (logical date as string), run_id, dag_run, and more!
ti = current task instance (for XCom, logging). ds = execution date (YYYY-MM-DD). ds_nodash = same but no dashes (20240115). execution_date = datetime object.
The EmailOperator sends emails. Perfect for notifications on success/failure, daily reports, or alerts when something goes wrong.
Use callback parameters on your DAG or tasks: on_success_callback, on_failure_callback. Or add an EmailOperator as the last task that runs only when the pipeline succeeds.
Set html_content for rich HTML emails. Use files=[] to attach files (e.g., a generated report). Configure SMTP in airflow.cfg or via the Admin → Connections UI.
Many teams use EmailOperator to send a "Pipeline completed successfully" summary every morning, or to alert on failure so someone can fix it before the next run.
The SimpleHttpOperator makes HTTP requests. Call REST APIs, trigger webhooks, or fetch data from external services.
Specify endpoint, method (GET, POST, etc.), data for request body, and headers. Use an HTTP Connection in Admin → Connections for the base URL and auth.
The response is pushed to XCom by default. Use response_filter to extract just the part you need (e.g., lambda x: json.loads(x)['data']).
Airflow has PostgresOperator, MySqlOperator, SqliteOperator, and more. Each executes SQL against its database type.
Pass your SQL as the sql parameter. Use parameters for safe parameterization. The operator uses the connection ID you specify (e.g., postgres_default).
Run CREATE TABLE IF NOT EXISTS to set up schema before loading data.
Execute INSERT or UPDATE with parameterized SQL for safe data loading.
Run SELECT for validation or to pull data; use parameters for templating.
Each uses its own connection type. Install providers: apache-airflow-providers-postgres, etc.
Transfer operators move data between systems. Examples: S3ToRedshiftOperator, LocalFilesystemToS3Operator, GoogleCloudStorageToBigQueryOperator.
Transfer operators are like moving trucks. Instead of you writing code to copy files from S3 to PostgreSQL, the operator does it for you in one task. You tell it where to read from (source) and where to write to (destination), and it handles the rest.
Install provider packages (e.g., apache-airflow-providers-amazon) to use them. Configure connections for both source and destination.
DummyOperator and EmptyOperator are placeholder tasks. They do nothing when they run, but they're useful for organizing your DAG structure.
Use them when you need a "join" point for parallel branches (e.g., two tasks both feed into a dummy task before the next step). Or use them to group related tasks visually in the Graph view.
Follow this decision tree when picking an operator:
→ BashOperator
→ PythonOperator
→ PostgresOperator, MySqlOperator, etc.
→ SimpleHttpOperator
→ EmailOperator
→ Transfer Operators (S3ToRedshift, etc.)
→ DummyOperator or EmptyOperator
Many operators (Postgres, MySQL, HTTP, AWS, GCP, etc.) live in provider packages. They're installed separately from core Airflow.
pip install apache-airflow-providers-postgres apache-airflow-providers-http apache-airflow-providers-amazon
Each provider adds new operators and hooks. Check Airflow Providers docs for the full list. Version compatibility matters — match provider versions to your Airflow version.
Operators fall into three main categories. Here's an SVG chart showing how they're organized:
Action operators perform work; Transfer operators move data; Sensors wait for triggers
BashOperator, PythonOperator, EmailOperator, PostgresOperator — they execute an action and finish.
S3ToRedshiftOperator, LocalToS3Operator — they copy or move data from one system to another.
FileSensor, S3KeySensor, HttpSensor — they wait (poll) until a condition is met before continuing.
Each operator type has a specific job. Use these cards to compare and choose:
BashOperator, PythonOperator, EmailOperator, SimpleHttpOperator. They DO something — run a command, execute Python, send email, call an API.
S3ToRedshift, LocalToS3, etc. Move data from one system to another. Like a conveyor belt between storage systems.
FileSensor, S3KeySensor, etc. Wait for something to appear (a file, a key in S3) before continuing. They poll until ready.
Watch how a PythonOperator task executes from start to finish:
PythonOperator Execution Flow
Operators use Hooks under the hood. Hooks are interfaces to external systems; operators wrap them into tasks:
Task template. Defines WHAT runs (e.g., "execute this SQL"). Has retries, timeouts, depends_on.
Connection logic. Handles HOW to talk to Postgres, S3, APIs. Reusable across operators.
Stored credentials (host, user, password). Hooks load these from Airflow's metadata DB.
PostgresOperator uses PostgresHook. S3ToRedshiftOperator uses S3Hook and RedshiftHook. You can use hooks directly in PythonOperator for custom logic!
Here's the simplest possible DAG — don't worry, we'll break down every line:
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime # Define the DAG with DAG( dag_id="my_first_dag", # The name of your pipeline start_date=datetime(2024, 1, 1), # When to start scheduling schedule="@daily", # Run once every day catchup=False, # Don't run for past dates ) as dag: # Task 1: Print hello hello = BashOperator( task_id="say_hello", bash_command='echo "Hello, Airflow! Today is $(date)"', ) # Task 2: Print goodbye goodbye = BashOperator( task_id="say_goodbye", bash_command='echo "Goodbye! Pipeline complete."', ) # Set the order: hello runs first, then goodbye hello >> goodbye
from airflow import DAG — We import the DAG class. This is the container for your entire pipeline.
from airflow.operators.bash import BashOperator — We import BashOperator so we can run shell commands.
with DAG(...) as dag: — This creates a DAG. Everything indented inside the with block belongs to this DAG.
dag_id = unique name shown in the UI.
start_date = Airflow won't schedule runs before this date.
schedule="@daily" = run once per day at midnight.
Each BashOperator is a task. It runs a bash command. The task_id is the unique name of that task within the DAG.
hello >> goodbye means "run hello first, then goodbye." The >> operator sets the execution order. Think of it as an arrow: hello → goodbye.
You'll use these commands all the time. Memorize them!
# List all your DAGs airflow dags list # Trigger a DAG manually airflow dags trigger my_first_dag # Test a specific task (without recording in DB) airflow tasks test my_first_dag say_hello 2024-01-01 # List tasks in a DAG airflow tasks list my_first_dag # Check scheduler health airflow jobs check # View configuration airflow config list
airflow tasks test is your best friend during development! It runs a single task without the scheduler, without recording results, and shows you the output immediately. Perfect for debugging.
Try these exercises to solidify what you've learned about operators. Don't peek at the answers until you've tried!
Build a DAG with at least 3 different operator types. For example: BashOperator (print date), PythonOperator (process data), and EmailOperator (send summary). Set dependencies so they run in order.
Import each operator from its module. Use op_args or op_kwargs to pass data to your Python callable. For EmailOperator, you need an SMTP connection configured.
Use: BashOperator → PythonOperator → EmailOperator. Chain with task1 >> task2 >> task3. Configure connections in Admin → Connections for email.
This code has 2 bugs. Find them: PythonOperator(task_id="run", python_callable=my_func, op_args=[1, 2]) — but my_func is defined as def my_func(x, y, **context). What's wrong?
Bug 1: op_args passes positional args. But if the callable has **context, it expects keyword args from Airflow. Use op_kwargs or remove **context if not needed.
Bug 2: The callable signature must not require context as positional. Pass it via op_kwargs={'context': ...} or use **kwargs and let Airflow inject it.
A) Fetch CSV from a public URL and save to S3. B) Run a SQL TRUNCATE then INSERT. C) Wait for a file to appear, then process it. D) Join two parallel branches before the next step. Which operators for each?
A) SimpleHttpOperator (fetch) + S3Hook in PythonOperator, or a transfer operator if one exists.
B) PostgresOperator or MySqlOperator (or two tasks).
C) FileSensor (wait) + PythonOperator or BashOperator (process).
D) DummyOperator or EmptyOperator as the join point.
Create a BashOperator that runs echo $RUN_DATE and passes the Airflow logical date ({{ ds }}) as the RUN_DATE environment variable. The command should print the date when the DAG run executes.
# Solution: BashOperator( task_id="print_date", bash_command='echo "Run date: $RUN_DATE"', env={"RUN_DATE": "{{ ds }}"}, )
Test your understanding of operators! Click on the answer you think is correct.