Delta Timestamp Loader¶
The DeltaTimestampLoader enables incremental data loading by filtering on one or more timestamp columns. This strategy is ideal for time-series data where you want to process only new records since the last load.
Key Features
- Timestamp-based filtering: Uses timestamp columns to identify new data
- Metadata tracking: Automatically tracks the last read timestamp
- Flexible bounds: Supports custom time ranges with from_timestampandto_timestamp
- Multiple pipelines: Uses delta_load_identifierto support multiple loads from the same source table
Metadata Management¶
The Delta Timestamp Loader maintains transaction history in a dedicated metadata table:
| Column | Description | 
|---|---|
| last_read_timestamp | The timestamp of the last successfully processed record | 
| delta_load_identifier | Unique identifier to distinguish different loads from the same source | 
| is_processed | Boolean flag indicating if the load completed successfully | 
| is_stale | Boolean flag indicating if the metadata entry is outdated | 
Multiple Pipeline Support
The delta_load_identifier allows you to run multiple delta loads from the same source table for different use cases or pipelines without conflicts.
Quick Start Guide¶
This example demonstrates how to set up incremental loading using the timestamp strategy.
from cloe_nessy.integration.writer import CatalogWriter
from cloe_nessy.delta_loader import DeltaLoadOptions, DeltaLoaderFactory
# Create Delta Writer
delta_writer = CatalogWriter()
# Configure Timestamp Delta Load
timestamp_delta_load_options = DeltaLoadOptions(
    strategy="Timestamp",
    delta_load_identifier="example_delta_load_identifier",
    strategy_options={
        "timestamp_filter_cols": ["timestamp_col"],
    },
)
# Create the Loader
loader = DeltaLoaderFactory.create_loader(
    table_identifier=source_table.identifier,
    options=timestamp_delta_load_options,
)
# Read Data Using Delta Load Strategy
df = loader.get_data()
# Write Data to Target Table
delta_writer.write_table(
    table_identifier="catalog.schema.table",
    data_frame=df,
)
# Mark Delta Load as Processed
loader.consume_data()
| Parameter | Type | Description | 
|---|---|---|
| strategy | str | Must be set to "Timestamp" | 
| delta_load_identifier | str | Unique identifier for this delta load | 
| timestamp_filter_cols | list[str] | List of timestamp column names to filter on | 
| from_timestamp | datetime(optional) | Custom lower bound for timestamp filtering | 
| to_timestamp | datetime(optional) | Custom upper bound for timestamp filtering | 
| filter_method | str(optional) | Logical operator for multiple columns: "AND","OR","&&","\\|\\|" | 
- Initialize: Create delta manager and configure options
- Load: Read incremental data using timestamp filtering
- Process: Apply any necessary transformations
- Write: Append data to target table
- Commit: Mark the delta load as processed
Error Handling
If any write operation fails, you must revert changes to previously written
tables to maintain data consistency. For complex multi-table writes,
consider handling the writing and data consumption manually instead of using
the write_data wrapper.
Filter Examples¶
Understanding how timestamp filtering works is crucial for effective delta loading. Here are practical examples showing different filtering scenarios:
Single Column Filtering¶
For a simple use case with one timestamp column:
# Example: Loading new orders since last run
timestamp_delta_load_options = DeltaLoadOptions(
    strategy="Timestamp",
    delta_load_identifier="daily_orders_load",
    strategy_options={
        "timestamp_filter_cols": ["created_at"],
        # No filter_method needed for single column
    },
)
Generated SQL Filter: created_at >= '2025-08-26T10:30:00' AND created_at < '2025-08-27T10:30:00'
Multiple Columns with AND Logic¶
When records must satisfy ALL timestamp conditions (e.g., created AND updated after a certain time):
# Example: Records that were both created and updated recently
timestamp_delta_load_options = DeltaLoadOptions(
    strategy="Timestamp",
    delta_load_identifier="recent_activity_load",
    strategy_options={
        "timestamp_filter_cols": ["created_at", "updated_at"],
        "filter_method": "AND",  # Both conditions must be true
    },
)
Generated SQL Filter:
(created_at >= '2025-08-26T10:30:00' AND updated_at >= '2025-08-26T10:30:00')
AND
(created_at < '2025-08-27T10:30:00' AND updated_at < '2025-08-27T10:30:00')
Multiple Columns with OR Logic¶
When records need to satisfy ANY timestamp condition (e.g., created OR updated after a certain time):
# Example: Records with any recent activity
timestamp_delta_load_options = DeltaLoadOptions(
    strategy="Timestamp",
    delta_load_identifier="any_activity_load",
    strategy_options={
        "timestamp_filter_cols": ["created_at", "updated_at", "last_accessed"],
        "filter_method": "OR",  # Any condition can be true
    },
)
Generated SQL Filter:
(created_at >= '2025-08-26T10:30:00' OR updated_at >= '2025-08-26T10:30:00' OR last_accessed >= '2025-08-26T10:30:00')
AND
(created_at < '2025-08-27T10:30:00' OR updated_at < '2025-08-27T10:30:00' OR last_accessed < '2025-08-27T10:30:00')
Custom Time Range¶
You can also specify custom boundaries instead of using metadata timestamps:
from datetime import datetime
# Example: Load specific time window
timestamp_delta_load_options = DeltaLoadOptions(
    strategy="Timestamp",
    delta_load_identifier="custom_range_load",
    strategy_options={
        "timestamp_filter_cols": ["event_time"],
        "from_timestamp": datetime(2025, 8, 1, 0, 0, 0),
        "to_timestamp": datetime(2025, 8, 31, 23, 59, 59),
    },
)
Generated SQL Filter: event_time >= '2025-08-01T00:00:00' AND event_time < '2025-08-31T23:59:59'
Filter Method Guidelines
- Use AND when you need strict consistency (e.g., both creation and modification timestamps must be recent)
- Use OR when you want to capture any activity (e.g., records that were either created, updated, or accessed recently)
- Single column doesn't require filter_methodparameter
Performance Considerations
- AND filters are typically more selective and may perform better
- OR filters can be broader and may return more data
- Ensure your timestamp columns are properly indexed for optimal performance
Technical Implementation¶
This section details the internal workflow of the DeltaTimestampLoader.
Processing Workflow¶
flowchart TD
    A[🚀 Start] --> B{📋 Metadata?}
    B -->|✅ Yes| C[🕐 Get Timestamp]
    B -->|❌ No| D[🆕 Full Load]
    C --> E[⚙️ Apply Bounds]
    D --> E
    E --> F[🔍 Filter Data]
    F --> G[📝 Register Op]
    G --> H[🔄 Transform]
    H --> I[💾 Write]
    I --> J{✅ Success?}
    J -->|✅| K[✔️ Mark Done]
    J -->|❌| L[🔄 Revert]
    K --> M[🎉 End]
    L --> N[⚠️ Error]
    %% Dark/Light mode compatible styling
    classDef startEnd fill:#3b82f6,stroke:#1e40af,stroke-width:2px,color:#ffffff
    classDef decision fill:#f59e0b,stroke:#d97706,stroke-width:2px,color:#ffffff
    classDef process fill:#8b5cf6,stroke:#7c3aed,stroke-width:2px,color:#ffffff
    classDef success fill:#10b981,stroke:#059669,stroke-width:2px,color:#ffffff
    classDef error fill:#ef4444,stroke:#dc2626,stroke-width:2px,color:#ffffff
    class A,M,N startEnd
    class B,J decision
    class C,D,E,F,G,H,I process
    class K success
    class L error