Pass data between tasks and build clean Python pipelines!
In Airflow, each task runs in its own process, often on different workers. They don't share memory. So how does Task B get data from Task A? That's exactly what XCom and the TaskFlow API solve!
Like passing notes in class. Each student (task) is at their own desk, but they need to share answers. Task A writes the answer on a "note" and puts it in a special box. Task B grabs the note from the box. The "box" is XCom — a shared storage that tasks use to pass small pieces of data!
XCom (short for Cross-Communication) lets tasks exchange small pieces of data. It's stored in Airflow's metadata database, associated with a specific task instance and DAG run.
XCom stores key-value pairs associated with a task instance. Each piece of data has a key (e.g., 'my_result') and a value (the actual data — string, number, dict, list, etc.). By default, XCom lives in the metadata database.
Two core methods power XCom:
ti.xcom_push(key='my_key', value='my_value')
ti.xcom_pull(task_ids='task_name', key='my_key')
Inside a Python callable, you receive ti (TaskInstance) via **kwargs. Use it to push or pull XCom values. The key lets you store multiple values from the same task.
PythonOperator automatically pushes your function's return value to XCom. No need to call xcom_push manually!
When you return something from a PythonOperator callable, Airflow stores it under the key return_value. Downstream tasks pull it with ti.xcom_pull(task_ids='upstream_task') — if you omit the key, it defaults to return_value.
You can pull XCom inside templated fields (e.g., bash_command, sql) using Jinja:
{{ ti.xcom_pull(task_ids='upstream_task') }}
{{ task_instance.xcom_pull(task_ids='...', key='my_key') }}
Airflow exposes ti and task_instance in the template context. Use them to inject data from upstream tasks into strings, SQL, or commands.
In Airflow 2.x, the @task decorator is the modern way to build Python-based DAGs. It wraps a function and turns it into a task automatically.
No need to instantiate PythonOperator manually. Decorate your function and it becomes a task.
Whatever you return is automatically pushed to XCom. No xcom_push needed.
When you write result = my_task() and then other_task(result), Airflow infers the dependency chain. Much cleaner than task_a >> task_b!
TaskFlow is like having a helper who passes the note for you. You just write normal Python functions that call each other. Airflow figures out the order and where to store the "notes" (XCom) behind the scenes. MUCH cleaner than the traditional approach!
You can mix @task-decorated functions with traditional operators like BashOperator or PostgresOperator. Use task_obj = my_task() to get the task object, then chain it: task_obj >> bash_task. To pull XCom from a TaskFlow task in a traditional operator, use templating: {{ ti.xcom_pull(task_ids='my_task') }}.
XCom is stored in the metadata database and has size limits (configurable but typically in the KB range). Never push Pandas DataFrames, large files, or big JSON blobs. Use external storage (S3, GCS, a database table, or a shared file path) and pass only references (e.g., S3 paths) via XCom.
You can configure Airflow to store XCom in S3 or GCS instead of the database. Set xcom_backend in airflow.cfg or as an environment variable. This allows larger payloads and reduces database load. Providers like apache-airflow-providers-amazon include S3-compatible backends.
Return a dictionary from a @task-decorated function to push multiple values. Each key becomes an XCom key. Downstream tasks can pull specific keys: ti.xcom_pull(task_ids='my_task', key='count'). For multiple named values, returning {"count": 42, "name": "pipeline"} is cleaner than pushing each with xcom_push.
You now understand XCom, xcom_push/pull, return values, the TaskFlow API, and when NOT to use XCom. Next up: Variables & Connections!
Data flows from Task A to Task B via XCom. Here's how it works:
Task A pushes to XCom; metadata DB stores it; Task B pulls it
Task A calls ti.xcom_push(key, value) or returns a value. Data goes into the metadata DB (or custom backend).
XCom is key-value storage tied to a task instance and DAG run. Default: metadata DB. Can use S3/GCS backend for larger payloads.
Task B calls ti.xcom_pull(task_ids='A', key='...') or uses {{ ti.xcom_pull(...) }} in templates.
TaskFlow (@task) vs traditional PythonOperator + manual XCom:
TaskFlow: cleaner Python. Traditional: explicit control.
Default XCom backend stores rows in the xcom table. Each row: key, value, task_id, dag_id, run_id.
XCom Lifecycle
Traditional way to pass data between tasks using the TaskInstance:
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def push_data(**kwargs): ti = kwargs["ti"] ti.xcom_push(key="my_key", value="my_value") ti.xcom_push(key="count", value=42) def pull_data(**kwargs): ti = kwargs["ti"] val = ti.xcom_pull(task_ids="push_task", key="my_key") count = ti.xcom_pull(task_ids="push_task", key="count") print(f"Got: {val}, {count}") with DAG(dag_id="xcom_dag", start_date=datetime(2024,1,1), schedule="@daily", catchup=False) as dag: push_task = PythonOperator(task_id="push_task", python_callable=push_data) pull_task = PythonOperator(task_id="pull_task", python_callable=pull_data) push_task >> pull_task
PythonOperator auto-pushes the return value under key return_value:
def get_count(**kwargs): return 100 # Auto-pushed as xcom key "return_value" def use_count(**kwargs): ti = kwargs["ti"] count = ti.xcom_pull(task_ids="get_count") # defaults to return_value print(f"Count is {count}") # Count is 100
Modern Airflow 2.x style — return values become XCom automatically, and function calls create dependencies:
from airflow.decorators import task from airflow import DAG from datetime import datetime with DAG(dag_id="taskflow_dag", start_date=datetime(2024,1,1), schedule="@daily", catchup=False) as dag: @task def extract(): data = {"records": 50, "status": "ok"} return data # Auto XCom @task def transform(data): data["records"] *= 2 return data # transform receives extract()'s return! transformed = transform(extract()) # Dependency + data flow in one line
Passing extract() into transform() creates the dependency AND passes the XCom value. No manual xcom_pull needed!
Combine @task functions with BashOperator, and pull XCom in templates:
from airflow.operators.bash import BashOperator from airflow.decorators import task @task def get_path(): return "/tmp/data.csv" path_task = get_path() bash_task = BashOperator( task_id="process", bash_command='echo "Processing {{ ti.xcom_pull(task_ids="get_path") }}"', ) path_task >> bash_task # Chain TaskFlow task with operator
Try these exercises to solidify what you've learned about XCom and TaskFlow. Don't peek at the answers until you've tried!
Write a DAG with two PythonOperator tasks. Task A should compute result = 10 + 20 and push it to XCom with key 'sum'. Task B should pull that value and print it. Set the dependency so A runs before B.
Use ti.xcom_push(key='sum', value=30) in Task A. In Task B, use ti.xcom_pull(task_ids='task_a', key='sum'). Add **kwargs to both callables to receive ti.
Task A: def push_sum(**kwargs): kwargs['ti'].xcom_push(key='sum', value=30). Task B: def pull_sum(**kwargs): val = kwargs['ti'].xcom_pull(task_ids='task_a', key='sum'); print(val). Chain: task_a >> task_b.
Rewrite the previous exercise so Task A simply return 30 instead of calling xcom_push. Task B should pull using the default key. What key does Airflow use when you return a value?
Task A: def compute(): return 30. Task B: ti.xcom_pull(task_ids='task_a') — no key needed! The default key is return_value. PythonOperator automatically pushes the return value under that key.
Convert your xcom_push/pull DAG to use the @task decorator. Have @task function get_data() return {"count": 42, "name": "pipeline"}. A second @task function process(data) receives it and prints data['count'].
@task def get_data(): return {"count": 42, "name": "pipeline"}. @task def process(data): print(data["count"]). Then: data = get_data(); process(data). Dependencies and XCom are automatic!
Task A generates a Pandas DataFrame with 100,000 rows. Task B needs to process it. Should you push the DataFrame via XCom? If not, what should you do instead?
Do NOT push the DataFrame to XCom! XCom has size limits (typically KB range). Instead: Task A saves the DataFrame to a file (e.g., Parquet on S3 or local path) and pushes only the path/URI via XCom. Task B pulls the path and loads the file.
Test your understanding of XCom and the TaskFlow API! Click on the answer you think is correct.