Building Your First Data Warehouse in Databricks — End to End 🎉
This is it. The article the entire series has been building toward.
We've covered Databricks fundamentals, Apache Spark, Delta Lake, DBFS, DataFrames, SQL, and the Medallion Architecture. Now we wire everything together into a real, working data warehouse — from raw data ingestion all the way to queryable Gold tables.
By the end of this article you'll have a functioning Lakehouse with Bronze, Silver, and Gold layers, a database registered in the Databricks catalog, and the ability to query your warehouse like a real data engineer.
Let's build it.
What We're Building
We'll build a Sales Data Warehouse using a publicly available dataset. Here's the full architecture:
CSV Files (raw sales data)
↓
🥉 BRONZE
bronze.sales_raw
Raw Delta table, append-only
↓
🥈 SILVER
silver.sales
Cleaned, deduplicated, enriched
↓
🥇 GOLD
gold.monthly_revenue — Revenue by region and month
gold.product_performance — Top products by sales volume
gold.customer_segments — Customers segmented by spend tier
↓
SQL queries / BI tool
Step 0: The Dataset
We'll use the Online Retail dataset — a real e-commerce transaction dataset available in Databricks sample data.
It contains ~500,000 rows of UK retail transactions with these columns:
| Column | Type | Description |
|---|---|---|
InvoiceNo |
String | Order ID |
StockCode |
String | Product code |
Description |
String | Product name |
Quantity |
Integer | Units ordered |
InvoiceDate |
String | Order date and time |
UnitPrice |
Double | Price per unit |
CustomerID |
Double | Customer identifier |
Country |
String | Customer country |
Step 1: Set Up Your Databases
Start a new notebook. This will be your setup notebook — run it once to create the structure.
# notebook: 00_setup
# Create the three layer databases
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
spark.sql("CREATE DATABASE IF NOT EXISTS silver")
spark.sql("CREATE DATABASE IF NOT EXISTS gold")
# Create the mount point directories
dbutils.fs.mkdirs("/mnt/warehouse/bronze/")
dbutils.fs.mkdirs("/mnt/warehouse/silver/")
dbutils.fs.mkdirs("/mnt/warehouse/gold/")
print("✅ Databases and directories created.")
Now check the Databricks Data tab — you should see three new databases: bronze, silver, and gold.
Step 2: Bronze — Ingest Raw Data
Create a new notebook: 01_bronze_ingestion
# notebook: 01_bronze_ingestion
from pyspark.sql.functions import current_timestamp, input_file_name, lit
print("Starting Bronze ingestion...")
# -------------------------------------------------------
# Read the raw CSV from Databricks sample datasets
# -------------------------------------------------------
raw_df = spark.read.csv(
"/databricks-datasets/online_retail/data-001/data.csv",
header=True,
inferSchema=True
)
print(f"Raw rows ingested: {raw_df.count():,}")
raw_df.printSchema()
# -------------------------------------------------------
# Add Bronze metadata columns
# -------------------------------------------------------
bronze_df = raw_df \
.withColumn("_ingested_at", current_timestamp()) \
.withColumn("_source_file", input_file_name()) \
.withColumn("_source_system", lit("online_retail_csv"))
# -------------------------------------------------------
# Write to Bronze Delta table
# -------------------------------------------------------
bronze_df.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/mnt/warehouse/bronze/sales_raw/")
# Register in catalog
spark.sql("""
CREATE TABLE IF NOT EXISTS bronze.sales_raw
USING DELTA
LOCATION '/mnt/warehouse/bronze/sales_raw/'
""")
# Quick validation
count = spark.read.format("delta").load("/mnt/warehouse/bronze/sales_raw/").count()
print(f"✅ Bronze table written. Total rows: {count:,}")
Run the cell. You should see output similar to:
Raw rows ingested: 541,909
✅ Bronze table written. Total rows: 541,909
Let's peek at what we landed:
display(spark.read.table("bronze.sales_raw").limit(10))
You'll see messy data — nulls in CustomerID, negative quantities (returns), zero-price rows. That's fine. Bronze captures reality. Silver fixes it.
Step 3: Silver — Clean and Enrich
Create a new notebook: 02_silver_transformation
# notebook: 02_silver_transformation
from pyspark.sql.functions import (
col, upper, trim, round, to_timestamp,
year, month, when, current_timestamp
)
print("Starting Silver transformation...")
# -------------------------------------------------------
# Read from Bronze
# -------------------------------------------------------
bronze = spark.read.table("bronze.sales_raw")
print(f"Bronze rows: {bronze.count():,}")
# -------------------------------------------------------
# Cleaning rules
# -------------------------------------------------------
silver = bronze \
\
`# 1. Drop rows with null CustomerID (anonymous sessions)`
.dropna(subset=["CustomerID"]) \
\
`# 2. Drop duplicates on InvoiceNo + StockCode`
.dropDuplicates(["InvoiceNo", "StockCode"]) \
\
`# 3. Remove returns (negative quantities) and zero-price items`
.filter(col("Quantity") > 0) \
.filter(col("UnitPrice") > 0) \
\
`# 4. Cast and clean types`
.withColumn("CustomerID", col("CustomerID").cast("integer")) \
.withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")) \
.withColumn("UnitPrice", round(col("UnitPrice"), 2)) \
\
`# 5. Derive new columns`
.withColumn("TotalAmount", round(col("Quantity") * col("UnitPrice"), 2)) \
.withColumn("Description", upper(trim(col("Description")))) \
.withColumn("Year", year(col("InvoiceDate"))) \
.withColumn("Month", month(col("InvoiceDate"))) \
.withColumn("Tier",
when(col("TotalAmount") >= 500, "High Value")
.when(col("TotalAmount") >= 100, "Mid Value")
.otherwise("Low Value")
) \
\
`# 6. Rename to snake_case`
.withColumnRenamed("InvoiceNo", "invoice_id") \
.withColumnRenamed("StockCode", "product_code") \
.withColumnRenamed("Description", "product_name") \
.withColumnRenamed("Quantity", "quantity") \
.withColumnRenamed("InvoiceDate", "invoice_date") \
.withColumnRenamed("UnitPrice", "unit_price") \
.withColumnRenamed("CustomerID", "customer_id") \
.withColumnRenamed("Country", "country") \
.withColumnRenamed("TotalAmount", "total_amount") \
.withColumnRenamed("Year", "year") \
.withColumnRenamed("Month", "month") \
.withColumnRenamed("Tier", "tier") \
\
`# 7. Drop Bronze metadata`
.drop("_ingested_at", "_source_file", "_source_system") \
\
`# 8. Add Silver metadata`
.withColumn("_processed_at", current_timestamp())
print(f"Silver rows after cleaning: {silver.count():,}")
# -------------------------------------------------------
# Write to Silver Delta table
# -------------------------------------------------------
silver.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("year", "month") \
.save("/mnt/warehouse/silver/sales/")
spark.sql("""
CREATE TABLE IF NOT EXISTS silver.sales
USING DELTA
LOCATION '/mnt/warehouse/silver/sales/'
""")
print(f"✅ Silver table written.")
display(spark.read.table("silver.sales").limit(5))
Expected output:
Bronze rows: 541,909
Silver rows after cleaning: 397,924
✅ Silver table written.
We dropped ~144,000 rows — nulls, returns, zero-price items. What remains is clean, trusted data.
Step 4: Gold — Build Business Tables
Create a new notebook: 03_gold_aggregations
We'll build three Gold tables.
Gold Table 1: Monthly Revenue by Country
# notebook: 03_gold_aggregations
from pyspark.sql.functions import sum, count, avg, countDistinct, round
silver = spark.read.table("silver.sales")
# -------------------------------------------------------
# Gold 1: Monthly Revenue by Country
# -------------------------------------------------------
monthly_revenue = silver \
.groupBy("year", "month", "country") \
.agg(
round(sum("total_amount"), 2).alias("total_revenue"),
count("invoice_id").alias("total_orders"),
round(avg("total_amount"), 2).alias("avg_order_value"),
countDistinct("customer_id").alias("unique_customers")
) \
.orderBy("year", "month", "total_revenue", ascending=[True, True, False])
monthly_revenue.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/mnt/warehouse/gold/monthly_revenue/")
spark.sql("""
CREATE TABLE IF NOT EXISTS gold.monthly_revenue
USING DELTA
LOCATION '/mnt/warehouse/gold/monthly_revenue/'
""")
print("✅ gold.monthly_revenue written.")
display(monthly_revenue.limit(10))
Gold Table 2: Product Performance
# -------------------------------------------------------
# Gold 2: Product Performance
# -------------------------------------------------------
product_performance = silver \
.groupBy("product_code", "product_name") \
.agg(
round(sum("total_amount"), 2).alias("total_revenue"),
sum("quantity").alias("units_sold"),
count("invoice_id").alias("times_ordered"),
countDistinct("customer_id").alias("unique_buyers"),
round(avg("unit_price"), 2).alias("avg_unit_price")
) \
.orderBy("total_revenue", ascending=False)
product_performance.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/mnt/warehouse/gold/product_performance/")
spark.sql("""
CREATE TABLE IF NOT EXISTS gold.product_performance
USING DELTA
LOCATION '/mnt/warehouse/gold/product_performance/'
""")
print("✅ gold.product_performance written.")
display(product_performance.limit(10))
Gold Table 3: Customer Segments
# -------------------------------------------------------
# Gold 3: Customer Segments
# -------------------------------------------------------
customer_segments = silver \
.groupBy("customer_id", "country") \
.agg(
round(sum("total_amount"), 2).alias("lifetime_value"),
count("invoice_id").alias("total_orders"),
round(avg("total_amount"), 2).alias("avg_order_value"),
countDistinct("product_code").alias("unique_products_bought")
) \
.withColumn("segment",
when(col("lifetime_value") >= 5000, "VIP")
.when(col("lifetime_value") >= 1000, "Loyal")
.when(col("lifetime_value") >= 200, "Regular")
.otherwise("Occasional")
) \
.orderBy("lifetime_value", ascending=False)
customer_segments.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/mnt/warehouse/gold/customer_segments/")
spark.sql("""
CREATE TABLE IF NOT EXISTS gold.customer_segments
USING DELTA
LOCATION '/mnt/warehouse/gold/customer_segments/'
""")
print("✅ gold.customer_segments written.")
display(customer_segments.limit(10))
Step 5: Query Your Data Warehouse
Open the SQL Editor in Databricks. Your warehouse is live. Start querying.
-- What were the top 5 revenue months?
SELECT
year,
month,
SUM(total_revenue) AS monthly_revenue,
SUM(total_orders) AS monthly_orders,
SUM(unique_customers) AS monthly_customers
FROM gold.monthly_revenue
GROUP BY year, month
ORDER BY monthly_revenue DESC
LIMIT 5;
-- What are the top 10 best-selling products?
SELECT
product_name,
total_revenue,
units_sold,
unique_buyers
FROM gold.product_performance
LIMIT 10;
-- How are customers distributed by segment?
SELECT
segment,
COUNT(*) AS customer_count,
ROUND(AVG(lifetime_value), 2) AS avg_lifetime_value,
ROUND(AVG(total_orders), 1) AS avg_orders
FROM gold.customer_segments
GROUP BY segment
ORDER BY avg_lifetime_value DESC;
-- Which countries generate the most revenue?
SELECT
country,
ROUND(SUM(total_revenue), 2) AS total_revenue,
SUM(total_orders) AS total_orders
FROM gold.monthly_revenue
GROUP BY country
ORDER BY total_revenue DESC
LIMIT 10;
You're querying a real data warehouse. Built by you. From scratch.
Step 6: Validate Your Warehouse
Good data engineers always validate. Run these checks before calling it done:
# notebook: 04_validation
print("=== DATA WAREHOUSE VALIDATION ===\n")
# Row counts across layers
bronze_count = spark.read.table("bronze.sales_raw").count()
silver_count = spark.read.table("silver.sales").count()
print(f"🥉 Bronze rows: {bronze_count:>10,}")
print(f"🥈 Silver rows: {silver_count:>10,} ({silver_count/bronze_count:.1%} of bronze)")
print()
# Gold table counts
for table in ["gold.monthly_revenue", "gold.product_performance", "gold.customer_segments"]:
count = spark.table(table).count()
print(f"🥇 {table}: {count:,} rows")
print()
# Null checks on Silver
from pyspark.sql.functions import col, sum as spark_sum
silver = spark.read.table("silver.sales")
null_counts = silver.select([
spark_sum(col(c).isNull().cast("int")).alias(c)
for c in ["invoice_id", "customer_id", "total_amount", "invoice_date"]
])
print("Null counts on critical Silver columns:")
display(null_counts)
# Revenue sanity check
total_revenue = silver.agg({"total_amount": "sum"}).collect()[0][0]
print(f"\nTotal Silver revenue: £{total_revenue:,.2f}")
print("\n✅ Validation complete.")
Step 7: Optimize Your Tables
Now that everything is built, run maintenance on your Gold tables for faster queries:
%sql
-- Compact small files
OPTIMIZE gold.monthly_revenue;
OPTIMIZE gold.product_performance;
OPTIMIZE gold.customer_segments;
-- Speed up common filter patterns
OPTIMIZE gold.monthly_revenue ZORDER BY (year, month, country);
OPTIMIZE gold.product_performance ZORDER BY (total_revenue);
OPTIMIZE gold.customer_segments ZORDER BY (segment, country);
What You've Built
Let's look at the complete picture:
📁 Databases created:
bronze / silver / gold
📄 Tables created:
bronze.sales_raw — 541,909 rows (raw, as-is)
silver.sales — 397,924 rows (clean, enriched)
gold.monthly_revenue — aggregated by year/month/country
gold.product_performance — aggregated by product
gold.customer_segments — aggregated by customer
🏗️ Architecture:
Medallion (Bronze → Silver → Gold)
All tables in Delta format
Silver partitioned by year/month
Gold tables OPTIMIZE'd with ZORDER
🔍 Queryable via:
Databricks SQL Editor
Any BI tool via JDBC/ODBC connector
Databricks notebooks
Where to Go From Here
You've built your first data warehouse in Databricks. Here's what to explore next:
Orchestration: Take your four notebooks and wire them into a Databricks Workflow — a scheduled pipeline that runs Bronze → Silver → Gold automatically on a schedule or trigger.
Incremental loads: Update the Bronze ingestion to load only new files, and update Silver to use MERGE instead of overwrite — real production pipelines are incremental.
Unity Catalog: In production Databricks, Unity Catalog provides centralized access control, data lineage, and governance across all your tables.
Databricks SQL Warehouses: Connect Power BI, Tableau, or Looker directly to your Gold tables via a SQL Warehouse endpoint.
dbt on Databricks: Use dbt to manage your Silver and Gold transformations with version control, testing, and documentation built in.
Series Complete 🎉
You went from zero to a working data warehouse in Databricks. That's not a small thing.
Top comments (0)