Intermediate Pro

Marketing Analytics Pipeline

Automate daily ingestion from marketing platforms, transform with DBT, and surface campaign ROI in Metabase. Build an end-to-end pipeline from raw data to dashboard β€” perfect for portfolios and interviews.

AirflowDBTPythonMetabase

πŸ“š Before you start, learn these courses:

Python β†’ Airflow β†’ DBT β†’ Metabase β†’

The Scenario

Imagine this…

Marketing runs campaigns on Google Ads, Facebook Ads, and Email. Every day they need to know: how much did we spend, how many impressions and clicks did we get, and what was the return? Did we make or lose money on each campaign?

Right now, someone manually downloads CSV reports from each platform, pastes them into a spreadsheet, and tries to calculate ROI. It's tedious, error-prone, and doesn't scale. You'll automate this entire process: build a pipeline that pulls data, cleans it, computes ROI, and surfaces it in a daily dashboard. Tomorrow it runs again β€” with zero manual work.

What You'll Build

APIs / Files β†’ Python β†’ Warehouse β†’ DBT β†’ Metabase
  • Data ingestion: Python script (or Airflow task) that pulls campaign spend, impressions, clicks from APIs or staged CSV files into a data warehouse staging table.
  • Staging DBT models: Clean and type-cast raw campaign and conversion data (stg_campaigns, stg_conversions).
  • ROI mart model: Join spend + revenue and compute ROI = revenue / cost by campaign and channel.
  • Airflow DAG: Orchestrates ingest β†’ dbt run β†’ dbt test on a daily schedule. Runs automatically every morning.
  • Metabase dashboard: Questions for Spend by Channel, ROI by Campaign, Daily Spend Trend. Marketing can self-serve insights.
  • Alerts: Metabase alert when ROI drops below a threshold so you catch problems early.

Prerequisites

Before you start, make sure you're comfortable with these. If not, we link to the exact courses below β€” finish those first, then come back.

SQL SELECT, WHERE, JOIN, GROUP BY, aggregate functions (SUM, COUNT, AVG). You should be able to join two tables and compute totals. PostgreSQL Course
Python basics Variables, loops, functions, reading/writing files. You'll write a script to generate or load campaign data. Python Course
Airflow intro What a DAG is, how to define tasks and dependencies. You don't need to be an expert β€” we'll walk through it. Airflow Course
DBT intro What DBT is, how to run dbt run and dbt test. You'll learn sources, refs, and models here. DBT Course
Metabase intro Connecting to a database, creating a question (query), and adding it to a dashboard. Metabase Course

Step-by-Step Plan

Here's the high-level road-map. The Build It tab walks you through every single step with copy-paste code, full explanations, and verification queries.

  1. 1
    Design the data model. Draw the schema: campaigns table (campaign_id, channel, date, cost, impressions, clicks) and conversions table (campaign_id, date, revenue, conversions). Write CREATE TABLE SQL.
  2. 2
    Create sample campaign data. Python script that generates realistic campaign data for Google Ads, Facebook Ads, and Email channels β€” ~200 rows. Output as INSERT statements or CSV.
  3. 3
    Build staging DBT models. stg_campaigns.sql and stg_conversions.sql with cleaning and type casting. Define sources.yml.
  4. 4
    Build the ROI mart model. mart_campaign_roi.sql: join spend + revenue, compute ROI = revenue / cost. Full SQL with CTE pattern.
  5. 5
    Create the Airflow DAG. Python DAG file: ingest task β†’ dbt_run β†’ dbt_test. Schedule daily.
  6. 6
    Build Metabase dashboard. Connect to warehouse, create questions (Spend by Channel, ROI by Campaign, Daily Spend Trend). Add them to a dashboard.
  7. 7
    Add alerts. Set up Metabase alerts for ROI dropping below a threshold.
1

Design the Data Model

Draw the schema and write CREATE TABLE SQL

What we're doing and why

Before writing any code, we need to know what tables we're building. Marketing data has two main pieces: (1) spend β€” how much we paid for ads (cost, impressions, clicks by campaign and date), and (2) conversions β€” what we got back (revenue, conversions by campaign and date). We'll store these in two staging tables. Drawing the schema first prevents confusion later.

1a. Schema diagram

campaigns

campaign_id (PK)
channel (Google/Facebook/Email)
date
cost
impressions
clicks

↕ join on campaign_id, date

conversions

campaign_id (FK)
date
revenue
conversions

1b. CREATE TABLE SQL

Run this against your warehouse (PostgreSQL, Redshift, or similar). We use a marketing schema to keep our project tables separate.

SQL
-- Create a schema for marketing data (if it doesn't exist)
CREATE SCHEMA IF NOT EXISTS marketing;

-- campaigns: ad spend, impressions, clicks by campaign, channel, and date
CREATE TABLE marketing.campaigns (
    campaign_id     VARCHAR(50)  NOT NULL,
    channel         VARCHAR(50)  NOT NULL,
    date            DATE         NOT NULL,
    cost            DECIMAL(12,2),
    impressions     INTEGER,
    clicks          INTEGER,
    PRIMARY KEY (campaign_id, channel, date)
);

-- conversions: revenue and conversion count by campaign and date
CREATE TABLE marketing.conversions (
    campaign_id     VARCHAR(50)  NOT NULL,
    date            DATE         NOT NULL,
    revenue         DECIMAL(12,2),
    conversions     INTEGER,
    PRIMARY KEY (campaign_id, date)
);

-- Optional: Add foreign key for data integrity
-- ALTER TABLE marketing.conversions
-- ADD CONSTRAINT fk_campaigns
-- FOREIGN KEY (campaign_id) REFERENCES marketing.campaigns(campaign_id);

Why these columns?

campaign_id: Unique identifier for each campaign (e.g. "google_summer_2024"). channel: Where the ad ran β€” Google Ads, Facebook Ads, Email. cost: How much we spent. impressions/clicks: Reach and engagement. revenue: Money earned from that campaign. ROI = revenue / cost.

Step 1 complete!

  • Schema designed: campaigns + conversions
  • Tables created in marketing schema
  • Primary keys defined for data integrity
2

Create Sample Campaign Data

Python script to generate realistic campaign data (~200 rows)

What we're doing and why

In production, data comes from APIs (Google Ads API, Facebook Marketing API). For this project, we'll simulate that with a Python script that generates realistic data for Google Ads, Facebook Ads, and Email. We'll output ~200 rows that look like real campaign data. You can run this once to populate your tables, or use it as a template for real API integrations.

2a. Python script: generate_campaign_data.py

Python
"""
Generate realistic marketing campaign data for Google Ads, Facebook Ads, and Email.
Outputs CSV files and optionally SQL INSERT statements.
Run: python generate_campaign_data.py
"""

import random
from datetime import datetime, timedelta
import csv

# Campaign IDs and channels
CAMPAIGNS = [
    ("google_summer_sale", "Google Ads"),
    ("google_brand_awareness", "Google Ads"),
    ("fb_retargeting", "Facebook Ads"),
    ("fb_lead_gen", "Facebook Ads"),
    ("email_newsletter", "Email"),
    ("email_promo", "Email"),
]

def random_date(start: datetime, end: datetime) -> datetime:
    delta = end - start
    return start + timedelta(days=random.randint(0, delta.days))

def generate_campaigns_data(num_days: int = 60) -> list:
    """Generate campaign spend, impressions, clicks."""
    rows = []
    start = datetime.now() - timedelta(days=num_days)
    end = datetime.now()

    for campaign_id, channel in CAMPAIGNS:
        base_cost = random.uniform(50, 500)
        base_imp = random.randint(1000, 50000)

        for _ in range(num_days):
            d = random_date(start, end)
            cost = round(base_cost * random.uniform(0.7, 1.3), 2)
            impressions = int(base_imp * random.uniform(0.8, 1.2))
            ctr = random.uniform(0.01, 0.05)
            clicks = int(impressions * ctr)
            rows.append({
                "campaign_id": campaign_id,
                "channel": channel,
                "date": d.strftime("%Y-%m-%d"),
                "cost": cost,
                "impressions": impressions,
                "clicks": clicks,
            })

    return rows[:200]  # Cap at ~200 rows

def generate_conversions_data(campaigns_rows: list) -> list:
    """Generate revenue and conversions (subset of campaigns)."""
    seen = set()
    rows = []
    for r in campaigns_rows:
        key = (r["campaign_id"], r["date"])
        if key in seen:
            continue
        seen.add(key)
        if random.random() < 0.6:
            revenue = round(r["cost"] * random.uniform(0.5, 3.0), 2)
            conversions = random.randint(0, int(r["clicks"] * 0.1) + 1)
            rows.append({
                "campaign_id": r["campaign_id"],
                "date": r["date"],
                "revenue": revenue,
                "conversions": conversions,
            })
    return rows

def main():
    campaigns = generate_campaigns_data()
    conversions = generate_conversions_data(campaigns)

    with open("campaigns.csv", "w", newline="") as f:
        w = csv.DictWriter(f, fieldnames=["campaign_id", "channel", "date", "cost", "impressions", "clicks"])
        w.writeheader()
        w.writerows(campaigns)

    with open("conversions.csv", "w", newline="") as f:
        w = csv.DictWriter(f, fieldnames=["campaign_id", "date", "revenue", "conversions"])
        w.writeheader()
        w.writerows(conversions)

    print(f"Generated {len(campaigns)} campaign rows -> campaigns.csv")
    print(f"Generated {len(conversions)} conversion rows -> conversions.csv")

if __name__ == "__main__":
    main()

2b. Load the CSV into the warehouse

After running the script, load the CSVs. In PostgreSQL (psql):

SQL (psql)
\COPY marketing.campaigns FROM '/path/to/campaigns.csv' WITH (FORMAT csv, HEADER true);
\COPY marketing.conversions FROM '/path/to/conversions.csv' WITH (FORMAT csv, HEADER true);

Using DBeaver or pgAdmin?

Right-click the table β†’ Import Data β†’ select your CSV. Ensure "Header" is checked. The script outputs campaigns.csv and conversions.csv in the current directory.

Step 2 complete!

  • Python script creates realistic campaign data for Google, Facebook, Email
  • ~200 rows in campaigns, matching conversions
  • CSVs loaded into marketing.campaigns and marketing.conversions
3

Build Staging DBT Models

stg_campaigns.sql and stg_conversions.sql with cleaning and type casting

What we're doing and why

Raw tables often have messy data: extra spaces, wrong types, nulls where we don't want them. DBT staging models clean and standardize the data before we use it in business logic. stg_campaigns and stg_conversions are our "cleaned versions" of the raw tables. We'll use DBT's source() to read from the raw tables and ref() when other models depend on us.

3a. Define sources: sources.yml

Create models/staging/sources.yml so DBT knows where our raw tables live:

YAML (models/staging/sources.yml)
version: 2

sources:
  - name: marketing_raw
    description: "Raw marketing data loaded from APIs or CSV files"
    schema: marketing
    tables:
      - name: campaigns
        description: "Ad spend, impressions, clicks by campaign and date"
      - name: conversions
        description: "Revenue and conversion counts by campaign and date"

3b. stg_campaigns.sql

SQL (models/staging/stg_campaigns.sql)
{{ config(
    materialized='view',
    schema='staging'
) }}

SELECT
    TRIM(campaign_id)::VARCHAR(50)     AS campaign_id,
    TRIM(channel)::VARCHAR(50)         AS channel,
    date::DATE                         AS date,
    COALESCE(cost, 0)::DECIMAL(12,2)   AS cost,
    COALESCE(impressions, 0)::INTEGER  AS impressions,
    COALESCE(clicks, 0)::INTEGER       AS clicks
FROM {{ source('marketing_raw', 'campaigns') }}
WHERE campaign_id IS NOT NULL
  AND date IS NOT NULL
  AND cost >= 0

Line-by-line

TRIM(campaign_id): Removes leading/trailing spaces. COALESCE(cost, 0): Replaces NULL cost with 0. WHERE cost >= 0: Drops invalid negative costs. We keep only rows with non-null campaign_id and date.

3c. stg_conversions.sql

SQL (models/staging/stg_conversions.sql)
{{ config(
    materialized='view',
    schema='staging'
) }}

SELECT
    TRIM(campaign_id)::VARCHAR(50)     AS campaign_id,
    date::DATE                         AS date,
    COALESCE(revenue, 0)::DECIMAL(12,2) AS revenue,
    COALESCE(conversions, 0)::INTEGER   AS conversions
FROM {{ source('marketing_raw', 'conversions') }}
WHERE campaign_id IS NOT NULL
  AND date IS NOT NULL
  AND revenue >= 0

3d. Run the staging models

Terminal
dbt run --select stg_campaigns stg_conversions

Verify in your database:

SQL
SELECT * FROM staging.stg_campaigns LIMIT 5;
SELECT * FROM staging.stg_conversions LIMIT 5;

Step 3 complete!

  • sources.yml defines marketing_raw.campaigns and .conversions
  • stg_campaigns: trimmed, type-cast, nulls handled
  • stg_conversions: same cleaning for revenue and conversions
4

Build the ROI Mart Model

mart_campaign_roi.sql β€” join spend + revenue, compute ROI

What we're doing and why

ROI (Return on Investment) = revenue / cost. If you spent $100 and made $300, ROI = 3 (or 300%). Marketing needs this by campaign and channel. We'll use CTEs (Common Table Expressions) to (1) sum cost by campaign/channel/date, (2) sum revenue by campaign/date, then (3) join them and compute ROI. This mart table is what Metabase will read.

4a. mart_campaign_roi.sql

SQL (models/marts/mart_campaign_roi.sql)
{{ config(
    materialized='table',
    schema='marts'
) }}

WITH spend AS (
    SELECT
        campaign_id,
        channel,
        date,
        SUM(cost)         AS total_cost,
        SUM(impressions)  AS total_impressions,
        SUM(clicks)       AS total_clicks
    FROM {{ ref('stg_campaigns') }}
    GROUP BY campaign_id, channel, date
),

revenue AS (
    SELECT
        campaign_id,
        date,
        SUM(revenue)      AS total_revenue,
        SUM(conversions)  AS total_conversions
    FROM {{ ref('stg_conversions') }}
    GROUP BY campaign_id, date
)

SELECT
    s.campaign_id,
    s.channel,
    s.date,
    s.total_cost,
    s.total_impressions,
    s.total_clicks,
    COALESCE(r.total_revenue, 0)      AS total_revenue,
    COALESCE(r.total_conversions, 0)  AS total_conversions,
    CASE
        WHEN s.total_cost > 0
        THEN ROUND(COALESCE(r.total_revenue, 0) / s.total_cost, 2)
        ELSE NULL
    END AS roi
FROM spend s
LEFT JOIN revenue r
    ON s.campaign_id = r.campaign_id
    AND s.date = r.date
ORDER BY s.date DESC, s.channel, s.campaign_id

In plain English

spend CTE: Sums cost, impressions, clicks per campaign/channel/date. revenue CTE: Sums revenue and conversions per campaign/date. Main SELECT: Joins spend and revenue on campaign_id + date. ROI = total_revenue / total_cost. We use LEFT JOIN so campaigns with no conversions still appear (revenue = 0, ROI = 0). We avoid division by zero with WHEN total_cost > 0.

4b. Run the mart

Terminal
dbt run --select mart_campaign_roi

Step 4 complete!

  • mart_campaign_roi joins spend and revenue
  • ROI = revenue / cost (with divide-by-zero protection)
  • Table created in marts schema β€” ready for Metabase
5

Create the Airflow DAG

Orchestrate ingest β†’ dbt_run β†’ dbt_test daily

What we're doing and why

Without Airflow, you'd run ingestion and DBT manually. With Airflow, the pipeline runs on a schedule (e.g. every day at 6 AM). The DAG has three tasks: (1) ingest β€” run our Python script or load files, (2) dbt_run β€” run DBT models, (3) dbt_test β€” run DBT tests. If ingest fails, dbt_run doesn't run. If dbt_run fails, dbt_test doesn't run. Airflow handles retries and alerts.

5a. DAG file: marketing_pipeline_dag.py

Save this in your Airflow dags/ folder:

Python (dags/marketing_pipeline_dag.py)
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Paths β€” customize these for your environment
DBT_PROJECT_PATH = "/path/to/your/marketing_dbt"
INGEST_SCRIPT_PATH = "/path/to/generate_campaign_data.py"

def run_ingest():
    """Run the Python script to generate/ingest campaign data."""
    import subprocess
    subprocess.run(["python", INGEST_SCRIPT_PATH], check=True)
    # In production you might load CSVs into DB here, or call an API

with DAG(
    dag_id='marketing_analytics_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
    },
    tags=['marketing', 'dbt'],
) as dag:

    ingest = PythonOperator(
        task_id='ingest_campaign_data',
        python_callable=run_ingest,
    )

    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command=f'cd {DBT_PROJECT_PATH} && dbt run',
    )

    dbt_test = BashOperator(
        task_id='dbt_test',
        bash_command=f'cd {DBT_PROJECT_PATH} && dbt test',
    )

    ingest >> dbt_run >> dbt_test

Simpler version (no Python ingest)

If you already have data loaded, replace the ingest task with a BashOperator that runs a SQL COPY or a simpler Python script. Or use a EmptyOperator as a placeholder until you add real API ingestion.

Step 5 complete!

  • DAG runs daily (schedule_interval='@daily')
  • Tasks: ingest β†’ dbt_run β†’ dbt_test
  • Retries on failure (2 retries, 5 min delay)
6

Build Metabase Dashboard

Connect, create questions, add to dashboard

What we're doing and why

Metabase is a BI tool that connects to your database and lets non-technical users run queries (as "questions") and view dashboards. We'll connect Metabase to our warehouse, create three questions: (1) Spend by Channel, (2) ROI by Campaign, (3) Daily Spend Trend. Then we'll add them to a "Marketing ROI" dashboard.

6a. Connect Metabase to your warehouse

In Metabase: Admin β†’ Databases β†’ Add database. Choose PostgreSQL (or Redshift). Enter host, port, database name, username, password. Test the connection.

6b. Create Question 1: Spend by Channel

SQL (Metabase Question)
SELECT
    channel,
    SUM(total_cost)        AS total_spend,
    SUM(total_impressions) AS total_impressions,
    SUM(total_clicks)      AS total_clicks
FROM marts.mart_campaign_roi
GROUP BY channel
ORDER BY total_spend DESC

Save as "Spend by Channel". Visualization: Bar chart (channel on X, total_spend on Y).

6c. Create Question 2: ROI by Campaign

SQL (Metabase Question)
SELECT
    campaign_id,
    channel,
    SUM(total_cost)   AS total_cost,
    SUM(total_revenue) AS total_revenue,
    ROUND(AVG(roi), 2) AS avg_roi
FROM marts.mart_campaign_roi
GROUP BY campaign_id, channel
HAVING SUM(total_cost) > 0
ORDER BY avg_roi DESC

Save as "ROI by Campaign". Visualization: Table.

6d. Create Question 3: Daily Spend Trend

SQL (Metabase Question)
SELECT
    date,
    channel,
    SUM(total_cost) AS daily_spend
FROM marts.mart_campaign_roi
GROUP BY date, channel
ORDER BY date, channel

Save as "Daily Spend Trend". Visualization: Line chart (date on X, daily_spend on Y, channel as series).

6e. Create the dashboard

Dashboards β†’ New dashboard β†’ name it "Marketing ROI". Add the three questions. Arrange them: Spend by Channel (top), ROI by Campaign (middle), Daily Spend Trend (bottom). Set a filter for date range if needed.

Step 6 complete!

  • Metabase connected to warehouse
  • Three questions: Spend by Channel, ROI by Campaign, Daily Spend Trend
  • Dashboard "Marketing ROI" created with all three
7

Add Alerts

Metabase alerts when ROI drops below threshold

What we're doing and why

You don't want to check dashboards every day. Metabase can send an email (or Slack) when a question's result meets a condition β€” e.g. "alert me if any campaign's ROI is below 1.0 (losing money)". That way you catch problems without manual monitoring.

7a. Create an ROI threshold question

New question: campaigns with ROI below 1.0 (or another threshold):

SQL (Metabase Question)
SELECT
    campaign_id,
    channel,
    SUM(total_cost)   AS total_cost,
    SUM(total_revenue) AS total_revenue,
    ROUND(AVG(roi), 2) AS roi
FROM marts.mart_campaign_roi
GROUP BY campaign_id, channel
HAVING AVG(roi) < 1.0
   AND SUM(total_cost) > 100
ORDER BY roi ASC

Save as "Low ROI Campaigns". HAVING AVG(roi) < 1.0 filters to campaigns losing money. SUM(total_cost) > 100 ignores tiny test campaigns.

7b. Set up the alert

Open the question β†’ Click the bell icon (Alerts) β†’ Create alert β†’ "When results exist" (so you get alerted when any row is returned). Add your email. Metabase will run this question on a schedule (e.g. daily) and email you if there are rows.

Step 7 complete!

  • Question "Low ROI Campaigns" created
  • Alert set: email when results exist (ROI < 1.0)
  • Marketing team gets notified automatically when campaigns underperform

Project complete!

You built a full marketing analytics pipeline from raw data to ROI dashboard. Here's what you can put on your resume:

  • Designed data model for campaigns and conversions
  • Built Python script for campaign data generation/ingestion
  • Created DBT staging and mart models with ROI calculation
  • Orchestrated pipeline with Airflow (ingest β†’ dbt run β†’ dbt test)
  • Built Metabase dashboard with Spend by Channel, ROI by Campaign, Daily Spend Trend
  • Configured Metabase alerts for low-ROI campaigns

What's next?

  • Connect real APIs β€” Google Ads API, Facebook Marketing API β€” instead of generated data
  • Add incremental DBT models to process only new data each day
  • Try the E-Commerce Data Warehouse project for medallion architecture
  • Add more marts: conversion funnel, attribution by channel

Unlock the Full Build Guide

Get the complete step-by-step implementation: copy-paste code, line-by-line explanations, tips, warnings, and verification queries. Build this project from zero to production.

Upgrade to Pro
Back to all projects