Building a Robust Data Engineering Utility Layer in Microsoft Fabric

Modern data platforms are not built on single scripts or ad‑hoc notebooks. They rely on reusable, well‑designed utility functions that handle extraction, transformation, auditing, and historical tracking in a consistent way.

This article walks through a real‑world Python utility module used in Microsoft Fabric, explaining every major function, what problem it solves, and how it fits into an enterprise‑grade data pipeline.


Overview: What This File Is Responsible For

This Python file acts as the backbone of a Fabric data pipeline, supporting:

  • Reading and writing Delta Lake tables
  • Executing SQL safely and efficiently
  • Handling archive vs current data
  • Seeding and staging tables
  • Performing Slowly Changing Dimension (SCD Type 2) merges
  • Writing audit and operational logs
  • Supporting retries and failure handling

In short: this is the glue that makes automated data pipelines reliable, repeatable, and auditable.


Unique Identifier Generation

uuid_with_datetime()

Purpose:
Generates a UUID that embeds the current timestamp.

Why this matters for data engineers:
In distributed systems, UUIDs are often used as run IDs, batch IDs, or event identifiers. By embedding the timestamp:

  • You can sort UUIDs chronologically
  • You gain human‑readable traceability
  • Debugging and log correlation becomes easier

How it works (high level):

  • Generates a standard UUID v4
  • Replaces part of it with YYMMDDHHMMSS
  • Preserves valid UUID formatting

This is especially useful for pipeline run tracking and audit logs.


Reading Delta Tables Safely

delta_to_pandas(table_path)

Purpose:
Loads a Delta Lake table into a Pandas DataFrame using Fabric authentication.

Key details:

  • Uses Fabric bearer tokens automatically
  • Reads via PyArrow for performance
  • Intended for:
    • Metadata tables
    • Small configuration datasets
    • Lookups

When to use:
When the dataset is small and Pandas convenience is helpful.


delta_to_polars(table_path)

Purpose:
Loads a Delta Lake table into a Polars DataFrame.

Why Polars is used:

  • Faster than Pandas
  • Lower memory usage
  • Better for large datasets

Design decision:
The function avoids converting Arrow → Pandas → Polars.
It reads Arrow directly into Polars, which is significantly more efficient.


Metadata & Catalog Integration

generate_datamart_dict()

Purpose:
Fetches Datamart metadata from the MyBMT API and returns it as a dictionary.

What it provides:

  • Datamart name (lowercased)
  • Internal IDs
  • Parent schema relationships

Why this exists:
Rather than hardcoding schemas and IDs, the pipeline dynamically discovers them from the enterprise metadata catalog. This reduces errors and centralises governance.


Seed & Start Date Resolution

get_seed_and_start_dates(suffix, lh_silver_path, Datamart, Viewname)

Purpose:
Determines the correct seed date and start date for a pipeline run.

How it works:

  • Special logic for certain regional suffixes (USD, CAD, EUR, NA)
  • Reads from:
    • PMO catalog tables
    • Sales catalog tables
    • Metadata configuration tables

Why this matters:
Different regions and domains often have different historical data availability. This function ensures each pipeline starts from the correct point in time.


SQL Rewriting for Archive Data

create_archive_query(sql_query, archive_date)

Purpose:
Automatically rewrites a SQL query so it reads from:

  • Bronze storage (if the date is today)
  • Archive storage (if historical)

What it does:

  • Uses regex to find FROM and JOIN clauses
  • Replaces table references with OPENROWSET
  • Handles special cases (TVFs, MIS schema, $ in table names)
  • Rewrites GETDATE() to a fixed archive date

Why this is powerful:
One SQL file can be reused for current and historical replays without manual edits.


Executing SQL Efficiently

execute_query(query, KeyVault)

Purpose:
Executes SQL via ODBC and returns a Polars DataFrame.

Key optimisations:

  • Uses fetchmany() batching
  • Converts directly to Arrow RecordBatches
  • Avoids row‑by‑row Python loops

Why this approach was chosen:
This is close to the fastest possible method in pure Python when using ODBC.


Archive Data Retrieval

Create_archive_dataframe(...)

Purpose:
Central orchestration function that:

  1. Loads a SQL template
  2. Rewrites it for archive access
  3. Executes the query
  4. Falls back to archive‑specific SQL if needed

Why it matters:
This function ensures pipelines never fail silently when data moves from current to archive storage.


Seeding Delta Tables

seed_delta_table(...)

Purpose:
Creates a fresh Delta table in the silver layer using historical data.

What it handles:

  • Retries (up to 10 attempts)
  • Type corrections (e.g. EXPIRY_DATE)
  • Delta writes using Arrow
  • Success & failure logging

Enterprise value:
This function allows safe backfills, reprocessing, and environment resets with full auditability.


Staging Data

archive_to_stg(...)

Purpose:
Writes archive data into the staging (stg) layer.

Key responsibilities:

  • Validates row counts
  • Normalises date columns
  • Overwrites staging tables safely

This function separates raw ingestion from business logic.


Primary Key Discovery

get_key_columns(table_name, lh_silver_path)

Purpose:
Determines the primary key columns for a table using metadata.

Why this matters:
SCD logic must never rely on hardcoded keys. This function ensures:

  • Consistency across pipelines
  • Centralised schema governance
  • Safer SCD merges

Date Generation Utilities

generate_date_list(start_date)

Purpose:
Generates a dynamic list of dates for backfills and reprocessing.

Logic:

  • Weekly intervals up to a cutoff date
  • Then every 2 days
  • Always includes today

This prevents unnecessary reprocessing while ensuring freshness.


Slowly Changing Dimension (SCD Type 2)

scd_type_2(src, table_path, key_columns)

Purpose:
Implements SCD Type 2 using Delta Lake merges.

What it does:

  • Deduplicates source data
  • Identifies:
    • New rows
    • Modified rows
  • Expires old records
  • Inserts new versions
  • Preserves history

Why this is enterprise‑grade:

  • Uses Polars for performance
  • Uses Delta Lake native merge
  • Avoids Pandas entirely for large operations
  • Returns detailed audit metrics

Upsert with Logging

upsert_and_log(...)

Purpose:
Runs SCD logic and writes operational logs.

What gets logged:

  • Success or failure
  • Row counts
  • Error messages
  • Pipeline name
  • Schema and table details

This is essential for compliance, monitoring, and support.

Leave a Comment