MODULE 4 OF 15

Operators Deep Dive

Master every major operator type — from Bash scripts to database queries to email alerts!

What are Operators?

In Airflow, operators are pre-built "task templates" that do specific jobs. Each operator knows exactly how to perform one type of action — run a bash command, execute a Python function, send an email, or query a database. When you add an operator to your DAG, you're adding a task that will do that specific job.

Explain Like I'm 5

Operators are like different kitchen appliances. A blender blends. A toaster toasts. A microwave heats. Each appliance does one thing really well. You don't use a blender to make toast!

Operators are the same — BashOperator runs shell commands. PythonOperator runs Python code. EmailOperator sends emails. Each one is perfect for its job. Pick the right "appliance" for what you need to do!

Airflow comes with dozens of operators built-in, and hundreds more are available through "provider packages" from the community. Mastering operators means you can build any pipeline you can imagine.

Key Takeaway

Operators = task templates. Each operator type performs a specific action. You choose the operator that matches what you want your task to do.

BashOperator Deep Dive

The BashOperator runs shell commands. It's perfect for running scripts, chaining commands with && or ;, and doing anything you'd do in a terminal.

Running Shell Commands, Scripts, and Chaining

You can run a single command: echo "Hello". You can chain commands: cd /tmp && ls -la. You can run a whole script: bash /path/to/script.sh.

BashOperator Syntax

BashOperator(task_id="...", bash_command="...", env={...})

The env Parameter for Environment Variables

Pass a dictionary to env to set environment variables for your command. Want MY_DATE available in your script? Use env={"MY_DATE": "{{ ds }}"} and Airflow will substitute the logical date.

Working with Output

Output goes to the task log. Use xcom_push=True to push the last line of stdout to XCom so downstream tasks can read it. For complex data, write to a file or database instead!

PythonOperator Deep Dive

The PythonOperator runs any Python function. It's the most flexible operator — if you can write it in Python, Airflow can run it.

python_callable, op_args, and op_kwargs

python_callable is the function to run. Pass positional arguments with op_args=[1, 2, 3] and keyword arguments with op_kwargs={"key": "value"}.

PythonOperator Syntax

PythonOperator(task_id="...", python_callable=my_func, op_args=[...], op_kwargs={...})

How Return Values Work

Whatever your function returns gets pushed to XCom automatically. Downstream tasks can pull it with ti.xcom_pull(task_ids='upstream_task').

Accessing Context (ti, ds, etc.)

Add **kwargs to your function signature to receive Airflow's context. You get ti (TaskInstance), ds (logical date as string), run_id, dag_run, and more!

Context Variables

ti = current task instance (for XCom, logging). ds = execution date (YYYY-MM-DD). ds_nodash = same but no dashes (20240115). execution_date = datetime object.

EmailOperator

The EmailOperator sends emails. Perfect for notifications on success/failure, daily reports, or alerts when something goes wrong.

Sending Notifications on Success/Failure

Use callback parameters on your DAG or tasks: on_success_callback, on_failure_callback. Or add an EmailOperator as the last task that runs only when the pipeline succeeds.

HTML Emails and Attachments

Set html_content for rich HTML emails. Use files=[] to attach files (e.g., a generated report). Configure SMTP in airflow.cfg or via the Admin → Connections UI.

Real-World Use

Many teams use EmailOperator to send a "Pipeline completed successfully" summary every morning, or to alert on failure so someone can fix it before the next run.

SimpleHttpOperator

The SimpleHttpOperator makes HTTP requests. Call REST APIs, trigger webhooks, or fetch data from external services.

Making API Calls

Specify endpoint, method (GET, POST, etc.), data for request body, and headers. Use an HTTP Connection in Admin → Connections for the base URL and auth.

Response Handling

The response is pushed to XCom by default. Use response_filter to extract just the part you need (e.g., lambda x: json.loads(x)['data']).

SQL Operators

Airflow has PostgresOperator, MySqlOperator, SqliteOperator, and more. Each executes SQL against its database type.

Executing Queries, Creating Tables, Inserting Data

Pass your SQL as the sql parameter. Use parameters for safe parameterization. The operator uses the connection ID you specify (e.g., postgres_default).

SQL Operator Examples

1
CREATE TABLE

Run CREATE TABLE IF NOT EXISTS to set up schema before loading data.

2
INSERT / UPDATE

Execute INSERT or UPDATE with parameterized SQL for safe data loading.

3
SELECT Queries

Run SELECT for validation or to pull data; use parameters for templating.

4
PostgresOperator / MySqlOperator / SqliteOperator

Each uses its own connection type. Install providers: apache-airflow-providers-postgres, etc.

Transfer Operators

Transfer operators move data between systems. Examples: S3ToRedshiftOperator, LocalFilesystemToS3Operator, GoogleCloudStorageToBigQueryOperator.

Explain Like I'm 5

Transfer operators are like moving trucks. Instead of you writing code to copy files from S3 to PostgreSQL, the operator does it for you in one task. You tell it where to read from (source) and where to write to (destination), and it handles the rest.

Install provider packages (e.g., apache-airflow-providers-amazon) to use them. Configure connections for both source and destination.

DummyOperator / EmptyOperator

DummyOperator and EmptyOperator are placeholder tasks. They do nothing when they run, but they're useful for organizing your DAG structure.

When to Use

Use them when you need a "join" point for parallel branches (e.g., two tasks both feed into a dummy task before the next step). Or use them to group related tasks visually in the Graph view.

How to Choose the Right Operator

Follow this decision tree when picking an operator:

Operator Decision Tree

1
Need to run a shell command?

BashOperator

2
Need custom Python logic?

PythonOperator

3
Need to run SQL?

PostgresOperator, MySqlOperator, etc.

4
Need to call an API?

SimpleHttpOperator

5
Need to send email?

EmailOperator

6
Need to move data between systems?

Transfer Operators (S3ToRedshift, etc.)

7
Need a placeholder?

DummyOperator or EmptyOperator

Provider Packages

Many operators (Postgres, MySQL, HTTP, AWS, GCP, etc.) live in provider packages. They're installed separately from core Airflow.

Install Community Operators

pip install apache-airflow-providers-postgres apache-airflow-providers-http apache-airflow-providers-amazon

Each provider adds new operators and hooks. Check Airflow Providers docs for the full list. Version compatibility matters — match provider versions to your Airflow version.

Operator Types Categorization

Operators fall into three main categories. Here's an SVG chart showing how they're organized:

OPERATOR TYPES ACTION Bash, Python, SQL TRANSFER S3→Postgres, GCS→BQ SENSOR FileSensor, S3KeySensor Do something Move data Wait for condition

Action operators perform work; Transfer operators move data; Sensors wait for triggers

Action Operators

BashOperator, PythonOperator, EmailOperator, PostgresOperator — they execute an action and finish.

Transfer Operators

S3ToRedshiftOperator, LocalToS3Operator — they copy or move data from one system to another.

Sensors

FileSensor, S3KeySensor, HttpSensor — they wait (poll) until a condition is met before continuing.

Operator Type Comparison Cards

Each operator type has a specific job. Use these cards to compare and choose:

Action Operators

BashOperator, PythonOperator, EmailOperator, SimpleHttpOperator. They DO something — run a command, execute Python, send email, call an API.

Transfer Operators

S3ToRedshift, LocalToS3, etc. Move data from one system to another. Like a conveyor belt between storage systems.

Sensor Operators

FileSensor, S3KeySensor, etc. Wait for something to appear (a file, a key in S3) before continuing. They poll until ready.

PythonOperator Lifecycle

Watch how a PythonOperator task executes from start to finish:

PythonOperator Execution Flow


Task Starts

Call python_callable

Inject op_args / op_kwargs

Execute Function

Return Value → XCom

Hooks vs Operators Architecture

Operators use Hooks under the hood. Hooks are interfaces to external systems; operators wrap them into tasks:

Operator

Task template. Defines WHAT runs (e.g., "execute this SQL"). Has retries, timeouts, depends_on.

Hook

Connection logic. Handles HOW to talk to Postgres, S3, APIs. Reusable across operators.

Connection

Stored credentials (host, user, password). Hooks load these from Airflow's metadata DB.

Key Insight

PostgresOperator uses PostgresHook. S3ToRedshiftOperator uses S3Hook and RedshiftHook. You can use hooks directly in PythonOperator for custom logic!

BashOperator

Here's the simplest possible DAG — don't worry, we'll break down every line:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# Define the DAG
with DAG(
    dag_id="my_first_dag",           # The name of your pipeline
    start_date=datetime(2024, 1, 1),  # When to start scheduling
    schedule="@daily",                # Run once every day
    catchup=False,                    # Don't run for past dates
) as dag:

    # Task 1: Print hello
    hello = BashOperator(
        task_id="say_hello",
        bash_command='echo "Hello, Airflow! Today is $(date)"',
    )

    # Task 2: Print goodbye
    goodbye = BashOperator(
        task_id="say_goodbye",
        bash_command='echo "Goodbye! Pipeline complete."',
    )

    # Set the order: hello runs first, then goodbye
    hello >> goodbye

Line-by-Line Breakdown

1
Imports

from airflow import DAG — We import the DAG class. This is the container for your entire pipeline.
from airflow.operators.bash import BashOperator — We import BashOperator so we can run shell commands.

2
DAG Definition

with DAG(...) as dag: — This creates a DAG. Everything indented inside the with block belongs to this DAG.
dag_id = unique name shown in the UI.
start_date = Airflow won't schedule runs before this date.
schedule="@daily" = run once per day at midnight.

3
Tasks

Each BashOperator is a task. It runs a bash command. The task_id is the unique name of that task within the DAG.

4
Dependencies

hello >> goodbye means "run hello first, then goodbye." The >> operator sets the execution order. Think of it as an arrow: hello → goodbye.

Key Airflow CLI Commands

You'll use these commands all the time. Memorize them!

# List all your DAGs
airflow dags list

# Trigger a DAG manually
airflow dags trigger my_first_dag

# Test a specific task (without recording in DB)
airflow tasks test my_first_dag say_hello 2024-01-01

# List tasks in a DAG
airflow tasks list my_first_dag

# Check scheduler health
airflow jobs check

# View configuration
airflow config list

Pro Tip

airflow tasks test is your best friend during development! It runs a single task without the scheduler, without recording results, and shows you the output immediately. Perfect for debugging.

Practice Exercises

Try these exercises to solidify what you've learned about operators. Don't peek at the answers until you've tried!

Exercise 1: Build a Multi-Operator DAG

Challenge

Build a DAG with at least 3 different operator types. For example: BashOperator (print date), PythonOperator (process data), and EmailOperator (send summary). Set dependencies so they run in order.

Hint

Import each operator from its module. Use op_args or op_kwargs to pass data to your Python callable. For EmailOperator, you need an SMTP connection configured.

Answer

Use: BashOperatorPythonOperatorEmailOperator. Chain with task1 >> task2 >> task3. Configure connections in Admin → Connections for email.

Exercise 2: Fix Broken Operator Code

Bug Hunt

This code has 2 bugs. Find them: PythonOperator(task_id="run", python_callable=my_func, op_args=[1, 2]) — but my_func is defined as def my_func(x, y, **context). What's wrong?

Answer

Bug 1: op_args passes positional args. But if the callable has **context, it expects keyword args from Airflow. Use op_kwargs or remove **context if not needed.
Bug 2: The callable signature must not require context as positional. Pass it via op_kwargs={'context': ...} or use **kwargs and let Airflow inject it.

Exercise 3: Choose the Right Operator

Scenarios

A) Fetch CSV from a public URL and save to S3. B) Run a SQL TRUNCATE then INSERT. C) Wait for a file to appear, then process it. D) Join two parallel branches before the next step. Which operators for each?

Answer

A) SimpleHttpOperator (fetch) + S3Hook in PythonOperator, or a transfer operator if one exists.
B) PostgresOperator or MySqlOperator (or two tasks).
C) FileSensor (wait) + PythonOperator or BashOperator (process).
D) DummyOperator or EmptyOperator as the join point.

Exercise 4: Use env in BashOperator

Challenge

Create a BashOperator that runs echo $RUN_DATE and passes the Airflow logical date ({{ ds }}) as the RUN_DATE environment variable. The command should print the date when the DAG run executes.

# Solution:
BashOperator(
    task_id="print_date",
    bash_command='echo "Run date: $RUN_DATE"',
    env={"RUN_DATE": "{{ ds }}"},
)

Module 4 Quiz

Test your understanding of operators! Click on the answer you think is correct.

1. What are Airflow operators?

2. Which parameter does PythonOperator use to specify the function to run?

3. How do you pass environment variables to a BashOperator?

4. Which operator would you use to run a SQL INSERT on PostgreSQL?

5. What is the purpose of DummyOperator or EmptyOperator?

6. Which operator is best for making REST API calls?

7. How do you install community operators like PostgresOperator?

8. What does op_kwargs do in PythonOperator?