MODULE 8 OF 15

XCom & TaskFlow API

Pass data between tasks and build clean Python pipelines!

The Problem: Tasks Run Independently

In Airflow, each task runs in its own process, often on different workers. They don't share memory. So how does Task B get data from Task A? That's exactly what XCom and the TaskFlow API solve!

Explain Like I'm 5

Like passing notes in class. Each student (task) is at their own desk, but they need to share answers. Task A writes the answer on a "note" and puts it in a special box. Task B grabs the note from the box. The "box" is XCom — a shared storage that tasks use to pass small pieces of data!

What is XCom? (Cross-Communication)

XCom (short for Cross-Communication) lets tasks exchange small pieces of data. It's stored in Airflow's metadata database, associated with a specific task instance and DAG run.

Key Concepts

XCom stores key-value pairs associated with a task instance. Each piece of data has a key (e.g., 'my_result') and a value (the actual data — string, number, dict, list, etc.). By default, XCom lives in the metadata database.

xcom_push and xcom_pull

Two core methods power XCom:

Pushing Data

ti.xcom_push(key='my_key', value='my_value')

Pulling Data

ti.xcom_pull(task_ids='task_name', key='my_key')

Inside a Python callable, you receive ti (TaskInstance) via **kwargs. Use it to push or pull XCom values. The key lets you store multiple values from the same task.

Automatic XCom with Return Values

PythonOperator automatically pushes your function's return value to XCom. No need to call xcom_push manually!

The "return_value" Key

When you return something from a PythonOperator callable, Airflow stores it under the key return_value. Downstream tasks pull it with ti.xcom_pull(task_ids='upstream_task') — if you omit the key, it defaults to return_value.

XCom in Jinja Templates

You can pull XCom inside templated fields (e.g., bash_command, sql) using Jinja:

Template Syntax

{{ ti.xcom_pull(task_ids='upstream_task') }}

{{ task_instance.xcom_pull(task_ids='...', key='my_key') }}

Airflow exposes ti and task_instance in the template context. Use them to inject data from upstream tasks into strings, SQL, or commands.

The TaskFlow API (@task decorator)

In Airflow 2.x, the @task decorator is the modern way to build Python-based DAGs. It wraps a function and turns it into a task automatically.

TaskFlow Benefits

1
@task creates PythonOperator

No need to instantiate PythonOperator manually. Decorate your function and it becomes a task.

2
Return values become XCom

Whatever you return is automatically pushed to XCom. No xcom_push needed.

3
Function calls create dependencies

When you write result = my_task() and then other_task(result), Airflow infers the dependency chain. Much cleaner than task_a >> task_b!

Explain Like I'm 5

TaskFlow is like having a helper who passes the note for you. You just write normal Python functions that call each other. Airflow figures out the order and where to store the "notes" (XCom) behind the scenes. MUCH cleaner than the traditional approach!

Mixing TaskFlow and Traditional Operators

You can mix @task-decorated functions with traditional operators like BashOperator or PostgresOperator. Use task_obj = my_task() to get the task object, then chain it: task_obj >> bash_task. To pull XCom from a TaskFlow task in a traditional operator, use templating: {{ ti.xcom_pull(task_ids='my_task') }}.

When NOT to Use XCom

XCom is NOT for Large Data

XCom is stored in the metadata database and has size limits (configurable but typically in the KB range). Never push Pandas DataFrames, large files, or big JSON blobs. Use external storage (S3, GCS, a database table, or a shared file path) and pass only references (e.g., S3 paths) via XCom.

  • Pandas DataFrames → Save to S3/GCS/DB, pass the path
  • Large files → Store in object storage, pass the URI
  • Huge JSON → Write to a file, pass the file path

Custom XCom Backends

You can configure Airflow to store XCom in S3 or GCS instead of the database. Set xcom_backend in airflow.cfg or as an environment variable. This allows larger payloads and reduces database load. Providers like apache-airflow-providers-amazon include S3-compatible backends.

Multiple Outputs from @task

Return a dictionary from a @task-decorated function to push multiple values. Each key becomes an XCom key. Downstream tasks can pull specific keys: ti.xcom_pull(task_ids='my_task', key='count'). For multiple named values, returning {"count": 42, "name": "pipeline"} is cleaner than pushing each with xcom_push.

Module 8 Complete!

You now understand XCom, xcom_push/pull, return values, the TaskFlow API, and when NOT to use XCom. Next up: Variables & Connections!

XCom Data Flow

Data flows from Task A to Task B via XCom. Here's how it works:

XCOM DATA FLOW Task A xcom_push() XCom DB metadata Task B xcom_pull() Uses data

Task A pushes to XCom; metadata DB stores it; Task B pulls it

Push

Task A calls ti.xcom_push(key, value) or returns a value. Data goes into the metadata DB (or custom backend).

Store

XCom is key-value storage tied to a task instance and DAG run. Default: metadata DB. Can use S3/GCS backend for larger payloads.

Pull

Task B calls ti.xcom_pull(task_ids='A', key='...') or uses {{ ti.xcom_pull(...) }} in templates.

TaskFlow API vs Traditional

TaskFlow (@task) vs traditional PythonOperator + manual XCom:

TaskFlow (@task) return value → auto XCom; func(x) → auto dependency Traditional xcom_push/pull; task_a >> task_b

TaskFlow: cleaner Python. Traditional: explicit control.

XCom in the Metadata DB

Default XCom backend stores rows in the xcom table. Each row: key, value, task_id, dag_id, run_id.

XCom Lifecycle


Task Runs

Push / Return

xcom Table

Downstream Pull

xcom_push and xcom_pull

Traditional way to pass data between tasks using the TaskInstance:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def push_data(**kwargs):
    ti = kwargs["ti"]
    ti.xcom_push(key="my_key", value="my_value")
    ti.xcom_push(key="count", value=42)

def pull_data(**kwargs):
    ti = kwargs["ti"]
    val = ti.xcom_pull(task_ids="push_task", key="my_key")
    count = ti.xcom_pull(task_ids="push_task", key="count")
    print(f"Got: {val}, {count}")

with DAG(dag_id="xcom_dag", start_date=datetime(2024,1,1), schedule="@daily", catchup=False) as dag:
    push_task = PythonOperator(task_id="push_task", python_callable=push_data)
    pull_task = PythonOperator(task_id="pull_task", python_callable=pull_data)
    push_task >> pull_task

Automatic XCom via Return Values

PythonOperator auto-pushes the return value under key return_value:

def get_count(**kwargs):
    return 100   # Auto-pushed as xcom key "return_value"

def use_count(**kwargs):
    ti = kwargs["ti"]
    count = ti.xcom_pull(task_ids="get_count")  # defaults to return_value
    print(f"Count is {count}")   # Count is 100

TaskFlow API (@task decorator)

Modern Airflow 2.x style — return values become XCom automatically, and function calls create dependencies:

from airflow.decorators import task
from airflow import DAG
from datetime import datetime

with DAG(dag_id="taskflow_dag", start_date=datetime(2024,1,1), schedule="@daily", catchup=False) as dag:

    @task
    def extract():
        data = {"records": 50, "status": "ok"}
        return data   # Auto XCom

    @task
    def transform(data):
        data["records"] *= 2
        return data   # transform receives extract()'s return!

    transformed = transform(extract())   # Dependency + data flow in one line

TaskFlow Magic

Passing extract() into transform() creates the dependency AND passes the XCom value. No manual xcom_pull needed!

Mixing TaskFlow and Traditional Operators

Combine @task functions with BashOperator, and pull XCom in templates:

from airflow.operators.bash import BashOperator
from airflow.decorators import task

@task
def get_path():
    return "/tmp/data.csv"

path_task = get_path()
bash_task = BashOperator(
    task_id="process",
    bash_command='echo "Processing {{ ti.xcom_pull(task_ids="get_path") }}"',
)
path_task >> bash_task   # Chain TaskFlow task with operator

Practice Exercises

Try these exercises to solidify what you've learned about XCom and TaskFlow. Don't peek at the answers until you've tried!

Exercise 1: Pass Data with xcom_push and xcom_pull

Challenge

Write a DAG with two PythonOperator tasks. Task A should compute result = 10 + 20 and push it to XCom with key 'sum'. Task B should pull that value and print it. Set the dependency so A runs before B.

Hint

Use ti.xcom_push(key='sum', value=30) in Task A. In Task B, use ti.xcom_pull(task_ids='task_a', key='sum'). Add **kwargs to both callables to receive ti.

Answer

Task A: def push_sum(**kwargs): kwargs['ti'].xcom_push(key='sum', value=30). Task B: def pull_sum(**kwargs): val = kwargs['ti'].xcom_pull(task_ids='task_a', key='sum'); print(val). Chain: task_a >> task_b.

Exercise 2: Use Return Value as XCom

Challenge

Rewrite the previous exercise so Task A simply return 30 instead of calling xcom_push. Task B should pull using the default key. What key does Airflow use when you return a value?

Answer

Task A: def compute(): return 30. Task B: ti.xcom_pull(task_ids='task_a') — no key needed! The default key is return_value. PythonOperator automatically pushes the return value under that key.

Exercise 3: Convert to TaskFlow API

Challenge

Convert your xcom_push/pull DAG to use the @task decorator. Have @task function get_data() return {"count": 42, "name": "pipeline"}. A second @task function process(data) receives it and prints data['count'].

Answer

@task def get_data(): return {"count": 42, "name": "pipeline"}. @task def process(data): print(data["count"]). Then: data = get_data(); process(data). Dependencies and XCom are automatic!

Exercise 4: When NOT to Use XCom

Scenario

Task A generates a Pandas DataFrame with 100,000 rows. Task B needs to process it. Should you push the DataFrame via XCom? If not, what should you do instead?

Answer

Do NOT push the DataFrame to XCom! XCom has size limits (typically KB range). Instead: Task A saves the DataFrame to a file (e.g., Parquet on S3 or local path) and pushes only the path/URI via XCom. Task B pulls the path and loads the file.

Module 8 Quiz

Test your understanding of XCom and the TaskFlow API! Click on the answer you think is correct.

1. What does XCom stand for?

2. How do you push data to XCom from a Python callable?

3. What key does PythonOperator use when it automatically pushes a return value?

4. What is the @task decorator?

5. When should you NOT use XCom?

6. How do you pull XCom in a Jinja template?

7. Where is XCom stored by default?

8. How do you return multiple values from a @task-decorated function?