Modern data platforms are not built on single scripts or ad‑hoc notebooks. They rely on reusable, well‑designed utility functions that handle extraction, transformation, auditing, and historical tracking in a consistent way.
This article walks through a real‑world Python utility module used in Microsoft Fabric, explaining every major function, what problem it solves, and how it fits into an enterprise‑grade data pipeline.
Overview: What This File Is Responsible For
This Python file acts as the backbone of a Fabric data pipeline, supporting:
- Reading and writing Delta Lake tables
- Executing SQL safely and efficiently
- Handling archive vs current data
- Seeding and staging tables
- Performing Slowly Changing Dimension (SCD Type 2) merges
- Writing audit and operational logs
- Supporting retries and failure handling
In short: this is the glue that makes automated data pipelines reliable, repeatable, and auditable.
Unique Identifier Generation
uuid_with_datetime()
Purpose:
Generates a UUID that embeds the current timestamp.
Why this matters for data engineers:
In distributed systems, UUIDs are often used as run IDs, batch IDs, or event identifiers. By embedding the timestamp:
- You can sort UUIDs chronologically
- You gain human‑readable traceability
- Debugging and log correlation becomes easier
How it works (high level):
- Generates a standard UUID v4
- Replaces part of it with
YYMMDDHHMMSS - Preserves valid UUID formatting
This is especially useful for pipeline run tracking and audit logs.
Reading Delta Tables Safely
delta_to_pandas(table_path)
Purpose:
Loads a Delta Lake table into a Pandas DataFrame using Fabric authentication.
Key details:
- Uses Fabric bearer tokens automatically
- Reads via PyArrow for performance
- Intended for:
- Metadata tables
- Small configuration datasets
- Lookups
When to use:
When the dataset is small and Pandas convenience is helpful.
delta_to_polars(table_path)
Purpose:
Loads a Delta Lake table into a Polars DataFrame.
Why Polars is used:
- Faster than Pandas
- Lower memory usage
- Better for large datasets
Design decision:
The function avoids converting Arrow → Pandas → Polars.
It reads Arrow directly into Polars, which is significantly more efficient.
Metadata & Catalog Integration
generate_datamart_dict()
Purpose:
Fetches Datamart metadata from the MyBMT API and returns it as a dictionary.
What it provides:
- Datamart name (lowercased)
- Internal IDs
- Parent schema relationships
Why this exists:
Rather than hardcoding schemas and IDs, the pipeline dynamically discovers them from the enterprise metadata catalog. This reduces errors and centralises governance.
Seed & Start Date Resolution
get_seed_and_start_dates(suffix, lh_silver_path, Datamart, Viewname)
Purpose:
Determines the correct seed date and start date for a pipeline run.
How it works:
- Special logic for certain regional suffixes (USD, CAD, EUR, NA)
- Reads from:
- PMO catalog tables
- Sales catalog tables
- Metadata configuration tables
Why this matters:
Different regions and domains often have different historical data availability. This function ensures each pipeline starts from the correct point in time.
SQL Rewriting for Archive Data
create_archive_query(sql_query, archive_date)
Purpose:
Automatically rewrites a SQL query so it reads from:
- Bronze storage (if the date is today)
- Archive storage (if historical)
What it does:
- Uses regex to find
FROMandJOINclauses - Replaces table references with
OPENROWSET - Handles special cases (TVFs, MIS schema,
$in table names) - Rewrites
GETDATE()to a fixed archive date
Why this is powerful:
One SQL file can be reused for current and historical replays without manual edits.
Executing SQL Efficiently
execute_query(query, KeyVault)
Purpose:
Executes SQL via ODBC and returns a Polars DataFrame.
Key optimisations:
- Uses
fetchmany()batching - Converts directly to Arrow RecordBatches
- Avoids row‑by‑row Python loops
Why this approach was chosen:
This is close to the fastest possible method in pure Python when using ODBC.
Archive Data Retrieval
Create_archive_dataframe(...)
Purpose:
Central orchestration function that:
- Loads a SQL template
- Rewrites it for archive access
- Executes the query
- Falls back to archive‑specific SQL if needed
Why it matters:
This function ensures pipelines never fail silently when data moves from current to archive storage.
Seeding Delta Tables
seed_delta_table(...)
Purpose:
Creates a fresh Delta table in the silver layer using historical data.
What it handles:
- Retries (up to 10 attempts)
- Type corrections (e.g.
EXPIRY_DATE) - Delta writes using Arrow
- Success & failure logging
Enterprise value:
This function allows safe backfills, reprocessing, and environment resets with full auditability.
Staging Data
archive_to_stg(...)
Purpose:
Writes archive data into the staging (stg) layer.
Key responsibilities:
- Validates row counts
- Normalises date columns
- Overwrites staging tables safely
This function separates raw ingestion from business logic.
Primary Key Discovery
get_key_columns(table_name, lh_silver_path)
Purpose:
Determines the primary key columns for a table using metadata.
Why this matters:
SCD logic must never rely on hardcoded keys. This function ensures:
- Consistency across pipelines
- Centralised schema governance
- Safer SCD merges
Date Generation Utilities
generate_date_list(start_date)
Purpose:
Generates a dynamic list of dates for backfills and reprocessing.
Logic:
- Weekly intervals up to a cutoff date
- Then every 2 days
- Always includes today
This prevents unnecessary reprocessing while ensuring freshness.
Slowly Changing Dimension (SCD Type 2)
scd_type_2(src, table_path, key_columns)
Purpose:
Implements SCD Type 2 using Delta Lake merges.
What it does:
- Deduplicates source data
- Identifies:
- New rows
- Modified rows
- Expires old records
- Inserts new versions
- Preserves history
Why this is enterprise‑grade:
- Uses Polars for performance
- Uses Delta Lake native
merge - Avoids Pandas entirely for large operations
- Returns detailed audit metrics
Upsert with Logging
upsert_and_log(...)
Purpose:
Runs SCD logic and writes operational logs.
What gets logged:
- Success or failure
- Row counts
- Error messages
- Pipeline name
- Schema and table details
This is essential for compliance, monitoring, and support.