Marketing Analytics Pipeline
Automate daily ingestion from marketing platforms, transform with DBT, and surface campaign ROI in Metabase. Build an end-to-end pipeline from raw data to dashboard β perfect for portfolios and interviews.
π Before you start, learn these courses:
The Scenario
Imagine thisβ¦
Marketing runs campaigns on Google Ads, Facebook Ads, and Email. Every day they need to know: how much did we spend, how many impressions and clicks did we get, and what was the return? Did we make or lose money on each campaign?
Right now, someone manually downloads CSV reports from each platform, pastes them into a spreadsheet, and tries to calculate ROI. It's tedious, error-prone, and doesn't scale. You'll automate this entire process: build a pipeline that pulls data, cleans it, computes ROI, and surfaces it in a daily dashboard. Tomorrow it runs again β with zero manual work.
What You'll Build
- Data ingestion: Python script (or Airflow task) that pulls campaign spend, impressions, clicks from APIs or staged CSV files into a data warehouse staging table.
- Staging DBT models: Clean and type-cast raw campaign and conversion data (stg_campaigns, stg_conversions).
- ROI mart model: Join spend + revenue and compute ROI = revenue / cost by campaign and channel.
- Airflow DAG: Orchestrates ingest β dbt run β dbt test on a daily schedule. Runs automatically every morning.
- Metabase dashboard: Questions for Spend by Channel, ROI by Campaign, Daily Spend Trend. Marketing can self-serve insights.
- Alerts: Metabase alert when ROI drops below a threshold so you catch problems early.
Prerequisites
Before you start, make sure you're comfortable with these. If not, we link to the exact courses below β finish those first, then come back.
| SQL | SELECT, WHERE, JOIN, GROUP BY, aggregate functions (SUM, COUNT, AVG). You should be able to join two tables and compute totals. | PostgreSQL Course |
| Python basics | Variables, loops, functions, reading/writing files. You'll write a script to generate or load campaign data. | Python Course |
| Airflow intro | What a DAG is, how to define tasks and dependencies. You don't need to be an expert β we'll walk through it. | Airflow Course |
| DBT intro | What DBT is, how to run dbt run and dbt test. You'll learn sources, refs, and models here. |
DBT Course |
| Metabase intro | Connecting to a database, creating a question (query), and adding it to a dashboard. | Metabase Course |
Step-by-Step Plan
Here's the high-level road-map. The Build It tab walks you through every single step with copy-paste code, full explanations, and verification queries.
-
1
Design the data model. Draw the schema: campaigns table (campaign_id, channel, date, cost, impressions, clicks) and conversions table (campaign_id, date, revenue, conversions). Write CREATE TABLE SQL.
-
2
Create sample campaign data. Python script that generates realistic campaign data for Google Ads, Facebook Ads, and Email channels β ~200 rows. Output as INSERT statements or CSV.
-
3
Build staging DBT models. stg_campaigns.sql and stg_conversions.sql with cleaning and type casting. Define sources.yml.
-
4
Build the ROI mart model. mart_campaign_roi.sql: join spend + revenue, compute ROI = revenue / cost. Full SQL with CTE pattern.
-
5
Create the Airflow DAG. Python DAG file: ingest task β dbt_run β dbt_test. Schedule daily.
-
6
Build Metabase dashboard. Connect to warehouse, create questions (Spend by Channel, ROI by Campaign, Daily Spend Trend). Add them to a dashboard.
-
7
Add alerts. Set up Metabase alerts for ROI dropping below a threshold.
Design the Data Model
Draw the schema and write CREATE TABLE SQL
What we're doing and why
Before writing any code, we need to know what tables we're building. Marketing data has two main pieces: (1) spend β how much we paid for ads (cost, impressions, clicks by campaign and date), and (2) conversions β what we got back (revenue, conversions by campaign and date). We'll store these in two staging tables. Drawing the schema first prevents confusion later.
1a. Schema diagram
campaigns
campaign_id (PK)
channel (Google/Facebook/Email)
date
cost
impressions
clicks
β join on campaign_id, date
conversions
campaign_id (FK)
date
revenue
conversions
1b. CREATE TABLE SQL
Run this against your warehouse (PostgreSQL, Redshift, or similar). We use a marketing schema to keep our project tables separate.
-- Create a schema for marketing data (if it doesn't exist)
CREATE SCHEMA IF NOT EXISTS marketing;
-- campaigns: ad spend, impressions, clicks by campaign, channel, and date
CREATE TABLE marketing.campaigns (
campaign_id VARCHAR(50) NOT NULL,
channel VARCHAR(50) NOT NULL,
date DATE NOT NULL,
cost DECIMAL(12,2),
impressions INTEGER,
clicks INTEGER,
PRIMARY KEY (campaign_id, channel, date)
);
-- conversions: revenue and conversion count by campaign and date
CREATE TABLE marketing.conversions (
campaign_id VARCHAR(50) NOT NULL,
date DATE NOT NULL,
revenue DECIMAL(12,2),
conversions INTEGER,
PRIMARY KEY (campaign_id, date)
);
-- Optional: Add foreign key for data integrity
-- ALTER TABLE marketing.conversions
-- ADD CONSTRAINT fk_campaigns
-- FOREIGN KEY (campaign_id) REFERENCES marketing.campaigns(campaign_id);
Why these columns?
campaign_id: Unique identifier for each campaign (e.g. "google_summer_2024"). channel: Where the ad ran β Google Ads, Facebook Ads, Email. cost: How much we spent. impressions/clicks: Reach and engagement. revenue: Money earned from that campaign. ROI = revenue / cost.
Step 1 complete!
- Schema designed: campaigns + conversions
- Tables created in
marketingschema - Primary keys defined for data integrity
Create Sample Campaign Data
Python script to generate realistic campaign data (~200 rows)
What we're doing and why
In production, data comes from APIs (Google Ads API, Facebook Marketing API). For this project, we'll simulate that with a Python script that generates realistic data for Google Ads, Facebook Ads, and Email. We'll output ~200 rows that look like real campaign data. You can run this once to populate your tables, or use it as a template for real API integrations.
2a. Python script: generate_campaign_data.py
"""
Generate realistic marketing campaign data for Google Ads, Facebook Ads, and Email.
Outputs CSV files and optionally SQL INSERT statements.
Run: python generate_campaign_data.py
"""
import random
from datetime import datetime, timedelta
import csv
# Campaign IDs and channels
CAMPAIGNS = [
("google_summer_sale", "Google Ads"),
("google_brand_awareness", "Google Ads"),
("fb_retargeting", "Facebook Ads"),
("fb_lead_gen", "Facebook Ads"),
("email_newsletter", "Email"),
("email_promo", "Email"),
]
def random_date(start: datetime, end: datetime) -> datetime:
delta = end - start
return start + timedelta(days=random.randint(0, delta.days))
def generate_campaigns_data(num_days: int = 60) -> list:
"""Generate campaign spend, impressions, clicks."""
rows = []
start = datetime.now() - timedelta(days=num_days)
end = datetime.now()
for campaign_id, channel in CAMPAIGNS:
base_cost = random.uniform(50, 500)
base_imp = random.randint(1000, 50000)
for _ in range(num_days):
d = random_date(start, end)
cost = round(base_cost * random.uniform(0.7, 1.3), 2)
impressions = int(base_imp * random.uniform(0.8, 1.2))
ctr = random.uniform(0.01, 0.05)
clicks = int(impressions * ctr)
rows.append({
"campaign_id": campaign_id,
"channel": channel,
"date": d.strftime("%Y-%m-%d"),
"cost": cost,
"impressions": impressions,
"clicks": clicks,
})
return rows[:200] # Cap at ~200 rows
def generate_conversions_data(campaigns_rows: list) -> list:
"""Generate revenue and conversions (subset of campaigns)."""
seen = set()
rows = []
for r in campaigns_rows:
key = (r["campaign_id"], r["date"])
if key in seen:
continue
seen.add(key)
if random.random() < 0.6:
revenue = round(r["cost"] * random.uniform(0.5, 3.0), 2)
conversions = random.randint(0, int(r["clicks"] * 0.1) + 1)
rows.append({
"campaign_id": r["campaign_id"],
"date": r["date"],
"revenue": revenue,
"conversions": conversions,
})
return rows
def main():
campaigns = generate_campaigns_data()
conversions = generate_conversions_data(campaigns)
with open("campaigns.csv", "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=["campaign_id", "channel", "date", "cost", "impressions", "clicks"])
w.writeheader()
w.writerows(campaigns)
with open("conversions.csv", "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=["campaign_id", "date", "revenue", "conversions"])
w.writeheader()
w.writerows(conversions)
print(f"Generated {len(campaigns)} campaign rows -> campaigns.csv")
print(f"Generated {len(conversions)} conversion rows -> conversions.csv")
if __name__ == "__main__":
main()
2b. Load the CSV into the warehouse
After running the script, load the CSVs. In PostgreSQL (psql):
\COPY marketing.campaigns FROM '/path/to/campaigns.csv' WITH (FORMAT csv, HEADER true); \COPY marketing.conversions FROM '/path/to/conversions.csv' WITH (FORMAT csv, HEADER true);
Using DBeaver or pgAdmin?
Right-click the table β Import Data β select your CSV. Ensure "Header" is checked. The script outputs campaigns.csv and conversions.csv in the current directory.
Step 2 complete!
- Python script creates realistic campaign data for Google, Facebook, Email
- ~200 rows in campaigns, matching conversions
- CSVs loaded into marketing.campaigns and marketing.conversions
Build Staging DBT Models
stg_campaigns.sql and stg_conversions.sql with cleaning and type casting
What we're doing and why
Raw tables often have messy data: extra spaces, wrong types, nulls where we don't want them. DBT staging models clean and standardize the data before we use it in business logic. stg_campaigns and stg_conversions are our "cleaned versions" of the raw tables. We'll use DBT's source() to read from the raw tables and ref() when other models depend on us.
3a. Define sources: sources.yml
Create models/staging/sources.yml so DBT knows where our raw tables live:
version: 2
sources:
- name: marketing_raw
description: "Raw marketing data loaded from APIs or CSV files"
schema: marketing
tables:
- name: campaigns
description: "Ad spend, impressions, clicks by campaign and date"
- name: conversions
description: "Revenue and conversion counts by campaign and date"
3b. stg_campaigns.sql
{{ config(
materialized='view',
schema='staging'
) }}
SELECT
TRIM(campaign_id)::VARCHAR(50) AS campaign_id,
TRIM(channel)::VARCHAR(50) AS channel,
date::DATE AS date,
COALESCE(cost, 0)::DECIMAL(12,2) AS cost,
COALESCE(impressions, 0)::INTEGER AS impressions,
COALESCE(clicks, 0)::INTEGER AS clicks
FROM {{ source('marketing_raw', 'campaigns') }}
WHERE campaign_id IS NOT NULL
AND date IS NOT NULL
AND cost >= 0
Line-by-line
TRIM(campaign_id): Removes leading/trailing spaces. COALESCE(cost, 0): Replaces NULL cost with 0. WHERE cost >= 0: Drops invalid negative costs. We keep only rows with non-null campaign_id and date.
3c. stg_conversions.sql
{{ config(
materialized='view',
schema='staging'
) }}
SELECT
TRIM(campaign_id)::VARCHAR(50) AS campaign_id,
date::DATE AS date,
COALESCE(revenue, 0)::DECIMAL(12,2) AS revenue,
COALESCE(conversions, 0)::INTEGER AS conversions
FROM {{ source('marketing_raw', 'conversions') }}
WHERE campaign_id IS NOT NULL
AND date IS NOT NULL
AND revenue >= 0
3d. Run the staging models
dbt run --select stg_campaigns stg_conversions
Verify in your database:
SELECT * FROM staging.stg_campaigns LIMIT 5; SELECT * FROM staging.stg_conversions LIMIT 5;
Step 3 complete!
- sources.yml defines marketing_raw.campaigns and .conversions
- stg_campaigns: trimmed, type-cast, nulls handled
- stg_conversions: same cleaning for revenue and conversions
Build the ROI Mart Model
mart_campaign_roi.sql β join spend + revenue, compute ROI
What we're doing and why
ROI (Return on Investment) = revenue / cost. If you spent $100 and made $300, ROI = 3 (or 300%). Marketing needs this by campaign and channel. We'll use CTEs (Common Table Expressions) to (1) sum cost by campaign/channel/date, (2) sum revenue by campaign/date, then (3) join them and compute ROI. This mart table is what Metabase will read.
4a. mart_campaign_roi.sql
{{ config(
materialized='table',
schema='marts'
) }}
WITH spend AS (
SELECT
campaign_id,
channel,
date,
SUM(cost) AS total_cost,
SUM(impressions) AS total_impressions,
SUM(clicks) AS total_clicks
FROM {{ ref('stg_campaigns') }}
GROUP BY campaign_id, channel, date
),
revenue AS (
SELECT
campaign_id,
date,
SUM(revenue) AS total_revenue,
SUM(conversions) AS total_conversions
FROM {{ ref('stg_conversions') }}
GROUP BY campaign_id, date
)
SELECT
s.campaign_id,
s.channel,
s.date,
s.total_cost,
s.total_impressions,
s.total_clicks,
COALESCE(r.total_revenue, 0) AS total_revenue,
COALESCE(r.total_conversions, 0) AS total_conversions,
CASE
WHEN s.total_cost > 0
THEN ROUND(COALESCE(r.total_revenue, 0) / s.total_cost, 2)
ELSE NULL
END AS roi
FROM spend s
LEFT JOIN revenue r
ON s.campaign_id = r.campaign_id
AND s.date = r.date
ORDER BY s.date DESC, s.channel, s.campaign_id
In plain English
spend CTE: Sums cost, impressions, clicks per campaign/channel/date. revenue CTE: Sums revenue and conversions per campaign/date. Main SELECT: Joins spend and revenue on campaign_id + date. ROI = total_revenue / total_cost. We use LEFT JOIN so campaigns with no conversions still appear (revenue = 0, ROI = 0). We avoid division by zero with WHEN total_cost > 0.
4b. Run the mart
dbt run --select mart_campaign_roi
Step 4 complete!
- mart_campaign_roi joins spend and revenue
- ROI = revenue / cost (with divide-by-zero protection)
- Table created in marts schema β ready for Metabase
Create the Airflow DAG
Orchestrate ingest β dbt_run β dbt_test daily
What we're doing and why
Without Airflow, you'd run ingestion and DBT manually. With Airflow, the pipeline runs on a schedule (e.g. every day at 6 AM). The DAG has three tasks: (1) ingest β run our Python script or load files, (2) dbt_run β run DBT models, (3) dbt_test β run DBT tests. If ingest fails, dbt_run doesn't run. If dbt_run fails, dbt_test doesn't run. Airflow handles retries and alerts.
5a. DAG file: marketing_pipeline_dag.py
Save this in your Airflow dags/ folder:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# Paths β customize these for your environment
DBT_PROJECT_PATH = "/path/to/your/marketing_dbt"
INGEST_SCRIPT_PATH = "/path/to/generate_campaign_data.py"
def run_ingest():
"""Run the Python script to generate/ingest campaign data."""
import subprocess
subprocess.run(["python", INGEST_SCRIPT_PATH], check=True)
# In production you might load CSVs into DB here, or call an API
with DAG(
dag_id='marketing_analytics_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
},
tags=['marketing', 'dbt'],
) as dag:
ingest = PythonOperator(
task_id='ingest_campaign_data',
python_callable=run_ingest,
)
dbt_run = BashOperator(
task_id='dbt_run',
bash_command=f'cd {DBT_PROJECT_PATH} && dbt run',
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command=f'cd {DBT_PROJECT_PATH} && dbt test',
)
ingest >> dbt_run >> dbt_test
Simpler version (no Python ingest)
If you already have data loaded, replace the ingest task with a BashOperator that runs a SQL COPY or a simpler Python script. Or use a EmptyOperator as a placeholder until you add real API ingestion.
Step 5 complete!
- DAG runs daily (
schedule_interval='@daily') - Tasks: ingest β dbt_run β dbt_test
- Retries on failure (2 retries, 5 min delay)
Build Metabase Dashboard
Connect, create questions, add to dashboard
What we're doing and why
Metabase is a BI tool that connects to your database and lets non-technical users run queries (as "questions") and view dashboards. We'll connect Metabase to our warehouse, create three questions: (1) Spend by Channel, (2) ROI by Campaign, (3) Daily Spend Trend. Then we'll add them to a "Marketing ROI" dashboard.
6a. Connect Metabase to your warehouse
In Metabase: Admin β Databases β Add database. Choose PostgreSQL (or Redshift). Enter host, port, database name, username, password. Test the connection.
6b. Create Question 1: Spend by Channel
SELECT
channel,
SUM(total_cost) AS total_spend,
SUM(total_impressions) AS total_impressions,
SUM(total_clicks) AS total_clicks
FROM marts.mart_campaign_roi
GROUP BY channel
ORDER BY total_spend DESC
Save as "Spend by Channel". Visualization: Bar chart (channel on X, total_spend on Y).
6c. Create Question 2: ROI by Campaign
SELECT
campaign_id,
channel,
SUM(total_cost) AS total_cost,
SUM(total_revenue) AS total_revenue,
ROUND(AVG(roi), 2) AS avg_roi
FROM marts.mart_campaign_roi
GROUP BY campaign_id, channel
HAVING SUM(total_cost) > 0
ORDER BY avg_roi DESC
Save as "ROI by Campaign". Visualization: Table.
6d. Create Question 3: Daily Spend Trend
SELECT
date,
channel,
SUM(total_cost) AS daily_spend
FROM marts.mart_campaign_roi
GROUP BY date, channel
ORDER BY date, channel
Save as "Daily Spend Trend". Visualization: Line chart (date on X, daily_spend on Y, channel as series).
6e. Create the dashboard
Dashboards β New dashboard β name it "Marketing ROI". Add the three questions. Arrange them: Spend by Channel (top), ROI by Campaign (middle), Daily Spend Trend (bottom). Set a filter for date range if needed.
Step 6 complete!
- Metabase connected to warehouse
- Three questions: Spend by Channel, ROI by Campaign, Daily Spend Trend
- Dashboard "Marketing ROI" created with all three
Add Alerts
Metabase alerts when ROI drops below threshold
What we're doing and why
You don't want to check dashboards every day. Metabase can send an email (or Slack) when a question's result meets a condition β e.g. "alert me if any campaign's ROI is below 1.0 (losing money)". That way you catch problems without manual monitoring.
7a. Create an ROI threshold question
New question: campaigns with ROI below 1.0 (or another threshold):
SELECT
campaign_id,
channel,
SUM(total_cost) AS total_cost,
SUM(total_revenue) AS total_revenue,
ROUND(AVG(roi), 2) AS roi
FROM marts.mart_campaign_roi
GROUP BY campaign_id, channel
HAVING AVG(roi) < 1.0
AND SUM(total_cost) > 100
ORDER BY roi ASC
Save as "Low ROI Campaigns". HAVING AVG(roi) < 1.0 filters to campaigns losing money. SUM(total_cost) > 100 ignores tiny test campaigns.
7b. Set up the alert
Open the question β Click the bell icon (Alerts) β Create alert β "When results exist" (so you get alerted when any row is returned). Add your email. Metabase will run this question on a schedule (e.g. daily) and email you if there are rows.
Step 7 complete!
- Question "Low ROI Campaigns" created
- Alert set: email when results exist (ROI < 1.0)
- Marketing team gets notified automatically when campaigns underperform
Project complete!
You built a full marketing analytics pipeline from raw data to ROI dashboard. Here's what you can put on your resume:
- Designed data model for campaigns and conversions
- Built Python script for campaign data generation/ingestion
- Created DBT staging and mart models with ROI calculation
- Orchestrated pipeline with Airflow (ingest β dbt run β dbt test)
- Built Metabase dashboard with Spend by Channel, ROI by Campaign, Daily Spend Trend
- Configured Metabase alerts for low-ROI campaigns
What's next?
- Connect real APIs β Google Ads API, Facebook Marketing API β instead of generated data
- Add incremental DBT models to process only new data each day
- Try the E-Commerce Data Warehouse project for medallion architecture
- Add more marts: conversion funnel, attribution by channel