Delta CDF Loader¶
The DeltaCDFLoader enables incremental data loading using Delta Lake's Change Data Feed (CDF) feature. This strategy captures all changes (inserts, updates, deletes) made to a source table, making it ideal for scenarios requiring complete change tracking.
Key Features
- Change Data Feed tracking: Captures all table changes using Delta CDF metadata
- Version-based loading: Uses commit versions to track progress
- Automatic deduplication: Handles multiple changes to the same record
- Multiple pipelines: Uses delta_load_identifierto support multiple loads from the same source table
Prerequisites
The source table must have Change Data Feed enabled:
Metadata Management¶
The Delta CDF Loader maintains transaction history in a dedicated metadata table:
| Column | Description | 
|---|---|
| start_commit_ver | The starting commit version for the delta load | 
| end_commit_ver | The ending commit version for the delta load | 
| delta_load_identifier | Unique identifier to distinguish different loads from the same source | 
| is_processed | Boolean flag indicating if the load completed successfully | 
| is_stale | Boolean flag indicating if the metadata entry is outdated | 
Multiple Pipeline Support
The delta_load_identifier allows you to run multiple delta loads from the same source table for different use cases or pipelines without conflicts.
Quick Start Guide¶
This example demonstrates how to set up incremental loading using the CDF strategy.
from cloe_nessy.integration.delta_loader import DeltaLoadOptions, DeltaLoaderFactory
from cloe_nessy.integration.writer import DeltaMergeWriter
# Create a Writer
writer = DeltaMergeWriter()
# Configure CDF Delta Load
cdf_delta_load_options = DeltaLoadOptions(
    strategy="CDF",
    delta_load_identifier="example_delta_load_identifier",
    strategy_options={
        "deduplication_columns": ["id_col"],
    },
)
# Create the Loader
loader = DeltaLoaderFactory.create_loader(
    table_identifier=source_table.identifier,
    options=cdf_delta_load_options,
)
# Read Data Using Delta Load Strategy
df = loader.get_data()
# Write Data to Target Table (using merge for updates)
writer.write(
    table=target_table,
    data_frame=df,
    key_columns=["id_col"],
)
# Mark Delta Load as Processed
loader.consume_data()
| Parameter | Type | Description | 
|---|---|---|
| strategy | str | Must be set to "CDF" | 
| delta_load_identifier | str | Unique identifier for this delta load | 
| deduplication_columns | list[str] | Columns used for deduplicating records | 
| enable_full_load | bool(optional) | Enable initial full table load, if no metadata can be found for the table. | 
| start_version | int(optional) | Custom starting commit version | 
| end_version | int(optional) | Custom ending commit version | 
- Initialize: Create delta manager and configure CDF options
- Load: Read incremental changes using commit versions
- Deduplicate: Remove duplicate records based on key columns
- Process: Apply any necessary transformations
- Merge: Update target table with changes
- Commit: Mark the delta load as processed
Delete Operations Limitation
The Delta CDF Loader currently does not support deletes on the target
table. Any rows deleted in the source table will still be written to the
target table or remain there. You can identify the type of change in the
_change_type column of the returned Dataframe.
Error Handling
If any write operation fails, you must revert changes to previously written tables to maintain data consistency. For complex multi-table writes, consider handling the writing and data consumption manually.
Technical Implementation¶
This section details the internal workflow of the DeltaCDFLoader.
Processing Workflow¶
flowchart TD
    A[๐ Start] --> B[๐ Get Commit Versions]
    B --> C{๐ Metadata?}
    C -->|โ
 Yes| D[๐ Get Last Version]
    C -->|โ No| E[๐ Use Min/Max Versions]
    D --> F[โ๏ธ Set Version Range]
    E --> F
    F --> G{๐ Same Version?}
    G -->|โ
 Yes| H[๐ญ Return Empty]
    G -->|โ No| I[๐ Read CDF Data]
    I --> J[๐ Deduplicate]
    J --> K[๐งน Strip CDF Metadata]
    K --> L[๐ Register Op]
    L --> M[๐ Transform]
    M --> N[๐พ Merge Write]
    N --> O{โ
 Success?}
    O -->|โ
| P[โ๏ธ Mark Done]
    O -->|โ| Q[๐ Revert]
    P --> R[๐ End]
    Q --> S[โ ๏ธ Error]
    H --> R
    %% Dark/Light mode compatible styling
    classDef startEnd fill:#3b82f6,stroke:#1e40af,stroke-width:2px,color:#ffffff
    classDef decision fill:#f59e0b,stroke:#d97706,stroke-width:2px,color:#ffffff
    classDef process fill:#8b5cf6,stroke:#7c3aed,stroke-width:2px,color:#ffffff
    classDef success fill:#10b981,stroke:#059669,stroke-width:2px,color:#ffffff
    classDef error fill:#ef4444,stroke:#dc2626,stroke-width:2px,color:#ffffff
    classDef empty fill:#6b7280,stroke:#4b5563,stroke-width:2px,color:#ffffff
    class A,R,S startEnd
    class C,G,O decision
    class B,D,E,F,I,J,K,L,M,N process
    class P success
    class Q error
    class H emptyStep-by-Step Process¶
Step 1: Determine Commit Version Range
Version Discovery
- Min Version: Latest CREATE,TRUNCATE, or CDF enablement version
- Max Version: Latest version in table history
Metadata Check
- Has entries? โ Use end_commit_versionfrom last processed load
- No entries? โ Use discovered min/max versions as range
Step 2: Read Change Data
Version Comparison
- Same versions? โ Return empty DataFrame (no changes)
- Different versions? โ Read CDF data from start_version + 1toend_version
CDF Processing
- Include Delta CDF metadata columns during read
Step 3: Data Processing
Deduplication (Optional)
- Remove duplicate records based on deduplication_columns
- Keep only the latest change per unique key
Metadata Cleanup
- Strip CDF metadata columns before writing to target
Step 4: Register and Write
Metadata Registration
Create new metadata entry with:
- start_commit_verand- end_commit_ver
- is_processed: Set to- falseinitially
Target Update
- Apply transformations as needed
- Use merge operations to handle inserts and updates
Step 5: Finalize Transaction
Success Path
- Mark metadata entry as is_processed = true
- Transaction complete
Failure Path
- Revert any applied changes (This has to be handled manually!)
Best Practices
- Enable CDF on source tables before starting delta loads
- Use appropriate deduplication columns for your use case
- Monitor commit version gaps in your metadata table
- Test delta load logic with small datasets first
- Consider implementing custom delete handling if required