Skip to content

Delta CDF Loader

The DeltaCDFLoader enables incremental data loading using Delta Lake's Change Data Feed (CDF) feature. This strategy captures all changes (inserts, updates, deletes) made to a source table, making it ideal for scenarios requiring complete change tracking.

Key Features

  • Change Data Feed tracking: Captures all table changes using Delta CDF metadata
  • Version-based loading: Uses commit versions to track progress
  • Automatic deduplication: Handles multiple changes to the same record
  • Multiple pipelines: Uses delta_load_identifier to support multiple loads from the same source table

Prerequisites

The source table must have Change Data Feed enabled:

ALTER TABLE your_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Metadata Management

The Delta CDF Loader maintains transaction history in a dedicated metadata table:

Column Description
start_commit_ver The starting commit version for the delta load
end_commit_ver The ending commit version for the delta load
delta_load_identifier Unique identifier to distinguish different loads from the same source
is_processed Boolean flag indicating if the load completed successfully
is_stale Boolean flag indicating if the metadata entry is outdated

Multiple Pipeline Support

The delta_load_identifier allows you to run multiple delta loads from the same source table for different use cases or pipelines without conflicts.

Quick Start Guide

This example demonstrates how to set up incremental loading using the CDF strategy.

from cloe_nessy.integration.delta_loader import DeltaLoadOptions, DeltaLoaderFactory
from cloe_nessy.integration.writer import DeltaMergeWriter


# Create a Writer
writer = DeltaMergeWriter()

# Configure CDF Delta Load
cdf_delta_load_options = DeltaLoadOptions(
    strategy="CDF",
    delta_load_identifier="example_delta_load_identifier",
    strategy_options={
        "deduplication_columns": ["id_col"],
    },
)

# Create the Loader
loader = DeltaLoaderFactory.create_loader(
    table_identifier=source_table.identifier,
    options=cdf_delta_load_options,
)

# Read Data Using Delta Load Strategy
df = loader.get_data()

# Write Data to Target Table (using merge for updates)
writer.write(
    table=target_table,
    data_frame=df,
    key_columns=["id_col"],
)

# Mark Delta Load as Processed
loader.consume_data()
Parameter Type Description
strategy str Must be set to "CDF"
delta_load_identifier str Unique identifier for this delta load
deduplication_columns list[str] Columns used for deduplicating records
enable_full_load bool (optional) Enable initial full table load, if no metadata can be found for the table.
start_version int (optional) Custom starting commit version
end_version int (optional) Custom ending commit version
  1. Initialize: Create delta manager and configure CDF options
  2. Load: Read incremental changes using commit versions
  3. Deduplicate: Remove duplicate records based on key columns
  4. Process: Apply any necessary transformations
  5. Merge: Update target table with changes
  6. Commit: Mark the delta load as processed

Delete Operations Limitation

The Delta CDF Loader currently does not support deletes on the target table. Any rows deleted in the source table will still be written to the target table or remain there. You can identify the type of change in the _change_type column of the returned Dataframe.

Error Handling

If any write operation fails, you must revert changes to previously written tables to maintain data consistency. For complex multi-table writes, consider handling the writing and data consumption manually.

Technical Implementation

This section details the internal workflow of the DeltaCDFLoader.

Processing Workflow

flowchart TD
    A[๐Ÿš€ Start] --> B[๐Ÿ“Š Get Commit Versions]
    B --> C{๐Ÿ“‹ Metadata?}
    C -->|โœ… Yes| D[๐Ÿ• Get Last Version]
    C -->|โŒ No| E[๐Ÿ†• Use Min/Max Versions]
    D --> F[โš™๏ธ Set Version Range]
    E --> F
    F --> G{๐Ÿ“ Same Version?}
    G -->|โœ… Yes| H[๐Ÿ“ญ Return Empty]
    G -->|โŒ No| I[๐Ÿ“– Read CDF Data]
    I --> J[๐Ÿ”„ Deduplicate]
    J --> K[๐Ÿงน Strip CDF Metadata]
    K --> L[๐Ÿ“ Register Op]
    L --> M[๐Ÿ”„ Transform]
    M --> N[๐Ÿ’พ Merge Write]
    N --> O{โœ… Success?}
    O -->|โœ…| P[โœ”๏ธ Mark Done]
    O -->|โŒ| Q[๐Ÿ”„ Revert]
    P --> R[๐ŸŽ‰ End]
    Q --> S[โš ๏ธ Error]
    H --> R

    %% Dark/Light mode compatible styling
    classDef startEnd fill:#3b82f6,stroke:#1e40af,stroke-width:2px,color:#ffffff
    classDef decision fill:#f59e0b,stroke:#d97706,stroke-width:2px,color:#ffffff
    classDef process fill:#8b5cf6,stroke:#7c3aed,stroke-width:2px,color:#ffffff
    classDef success fill:#10b981,stroke:#059669,stroke-width:2px,color:#ffffff
    classDef error fill:#ef4444,stroke:#dc2626,stroke-width:2px,color:#ffffff
    classDef empty fill:#6b7280,stroke:#4b5563,stroke-width:2px,color:#ffffff

    class A,R,S startEnd
    class C,G,O decision
    class B,D,E,F,I,J,K,L,M,N process
    class P success
    class Q error
    class H empty

Step-by-Step Process

Step 1: Determine Commit Version Range

Version Discovery

  • Min Version: Latest CREATE, TRUNCATE, or CDF enablement version
  • Max Version: Latest version in table history

Metadata Check

  • Has entries? โ†’ Use end_commit_version from last processed load
  • No entries? โ†’ Use discovered min/max versions as range

Step 2: Read Change Data

Version Comparison

  • Same versions? โ†’ Return empty DataFrame (no changes)
  • Different versions? โ†’ Read CDF data from start_version + 1 to end_version

CDF Processing

  • Include Delta CDF metadata columns during read

Step 3: Data Processing

Deduplication (Optional)

  • Remove duplicate records based on deduplication_columns
  • Keep only the latest change per unique key

Metadata Cleanup

  • Strip CDF metadata columns before writing to target

Step 4: Register and Write

Metadata Registration

Create new metadata entry with:

  • start_commit_ver and end_commit_ver
  • is_processed: Set to false initially

Target Update

  • Apply transformations as needed
  • Use merge operations to handle inserts and updates

Step 5: Finalize Transaction

Success Path

  • Mark metadata entry as is_processed = true
  • Transaction complete

Failure Path

  • Revert any applied changes (This has to be handled manually!)

Best Practices

  • Enable CDF on source tables before starting delta loads
  • Use appropriate deduplication columns for your use case
  • Monitor commit version gaps in your metadata table
  • Test delta load logic with small datasets first
  • Consider implementing custom delete handling if required