Skip to content

Index

DeltaCDFConfig

Bases: BaseModel

This class holds the config for the DeltaCDFLoader.

Parameters:

Name Type Description Default
deduplication_columns

A list of columns used for deduplication.

required
from_commit_version

The starting commit version. If None, it starts from the first viable version.

required
to_commit_version

The ending commit version. If None, it goes up to the latest version.

required
enable_full_load

Enables an initial full load of the target table. If no valid delta load history for the table exists, the delta loader will do a full load of the target table and set the metadata to the newest commit version. This might be useful if the change data feed history is incomplete, either because the table was vacuumed or the change data feed was enabled later in the lifecycle of the table. Otherwise the table will initially be loaded from the first valid commit version. When True, from_commit_version and to_commit_version will be ignored on the initial load. Defaults to False.

required
Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_cdf_loader.py
class DeltaCDFConfig(BaseModel):
    """This class holds the config for the DeltaCDFLoader.

    Args:
        deduplication_columns: A list of columns used for deduplication.
        from_commit_version: The starting commit version. If None, it starts from the first viable version.
        to_commit_version: The ending commit version. If None, it goes up to the latest version.
        enable_full_load: Enables an initial full load of the target table. If
            no valid delta load history for the table exists, the delta loader
            will do a full load of the target table and set the metadata to the
            newest commit version. This might be useful if the change data feed
            history is incomplete, either because the table was vacuumed or the
            change data feed was enabled later in the lifecycle of the table.
            Otherwise the table will initially be loaded from the first valid
            commit version. When True, `from_commit_version` and
            `to_commit_version` will be ignored on the initial load. Defaults to
            False.
    """

    deduplication_columns: list[str | Column] | None = Field(default=None)
    from_commit_version: int | None = Field(default=None)
    to_commit_version: int | None = Field(default=None)
    enable_full_load: bool = Field(default=False)

DeltaCDFLoader

Bases: DeltaLoader

Implementation of the DeltaLoader interface using CDF strategy.

Parameters:

Name Type Description Default
config DeltaCDFConfig

Configuration for the DeltaCDFLoader.

required
table_identifier str

Identifier for the table to be loaded.

required
delta_load_identifier str

Identifier for the delta load.

required
metadata_table_identifier str | None

Identifier for the metadata table. Defaults to None.

None
Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_cdf_loader.py
class DeltaCDFLoader(DeltaLoader):
    """Implementation of the DeltaLoader interface using CDF strategy.

    Args:
        config: Configuration for the DeltaCDFLoader.
        table_identifier: Identifier for the table to be loaded.
        delta_load_identifier: Identifier for the delta load.
        metadata_table_identifier: Identifier for the metadata table. Defaults to None.
    """

    def __init__(
        self,
        config: DeltaCDFConfig,
        table_identifier: str,
        delta_load_identifier: str,
        metadata_table_identifier: str | None = None,
    ):
        super().__init__(
            table_identifier,
            delta_load_identifier,
            metadata_table_identifier,
        )
        self.config = config
        self.table_reader = self._spark.read

    def _check_cdf_enabled(self, table_identifier: str) -> bool:
        """Checks if Change Data Feed is enabled for the table."""
        try:
            # Try catalog table approach first (for table names like catalog.schema.table)
            if table_identifier.count(".") == 2 and not table_identifier.startswith("/"):
                table_properties = self._query(f"SHOW TBLPROPERTIES {table_identifier}").collect()
                properties_dict = {row["key"]: row["value"] for row in table_properties}
                value = properties_dict.get("delta.enableChangeDataFeed", "false")
                return str(value).lower() == "true"
            # For file paths, use Delta Table API directly
            from delta import DeltaTable  # type: ignore[import-untyped]

            delta_table = DeltaTable.forPath(self._spark, table_identifier)
            properties = delta_table.detail().select("properties").collect()[0]["properties"]
            value = properties.get("delta.enableChangeDataFeed", "false") if properties else "false"
            return str(value).lower() == "true"
        except Exception:
            # If we can't determine CDF status, assume it's not enabled
            return False

    def _has_valid_metadata(self) -> bool:
        """Checks if valid (i.e. non-stale) metadata exists for the delta load."""
        try:
            df = self._spark.sql(f"""
                SELECT * FROM {self.metadata_table_identifier}
                WHERE source_table_identifier = '{self.table_identifier}'
                AND delta_load_identifier = '{self.delta_load_identifier}'
                AND is_processed = true
                AND is_stale = false
            """)
            return not df.isEmpty()
        except Exception as e:
            self._console_logger.warning(f"Error accessing metadata table: {e}")
            return False

    def _get_commit_versions(self) -> tuple[int, int]:
        """Retrieves the starting and ending commit versions for CDF data."""

        def _get_metadata_df() -> DataFrame:
            df = self.table_reader.table(self.metadata_table_identifier)
            return df.filter(
                (F.col("source_table_identifier") == self.table_identifier)
                & (F.col("delta_load_identifier") == self.delta_load_identifier)
                & F.col("is_processed")
                & ~F.col("is_stale"),
            )

        def _get_commit_version(query: DataFrame, version_filter: str | None = None) -> int | None:
            if version_filter is not None:
                query = query.filter(version_filter)
            row = query.selectExpr("max(version)").first()
            if row is None or row[0] is None:
                return None
            # Add type validation before casting
            version_value = row[0]
            if not isinstance(version_value, (int | float)) or isinstance(version_value, bool):
                raise TypeError(f"Expected numeric version, got {type(version_value)}: {version_value}")
            return int(version_value)

        metadata_df = _get_metadata_df()
        self._console_logger.info("Querying table history to find minimum version.")
        min_version_filter = None
        if self.config.from_commit_version is not None:
            min_version_filter = f"version >= {self.config.from_commit_version}"
        # Handle history queries for both catalog tables and file paths
        if self.table_identifier.count(".") == 2 and not self.table_identifier.startswith("/"):
            # Catalog table
            history_query = f"DESCRIBE HISTORY {self.table_identifier}"
        else:
            # File path - need to use delta.`path` format
            history_query = f"DESCRIBE HISTORY delta.`{self.table_identifier}`"

        min_commit_version = _get_commit_version(
            self._query(history_query).filter(
                "operation like 'CREATE%' OR operation = 'TRUNCATE' OR operationParameters.properties like '%delta.enableChangeDataFeed%' "
            ),
            min_version_filter,
        )
        if min_commit_version is None:
            min_commit_version = 0

        max_version_filter = None
        if self.config.to_commit_version is not None:
            max_version_filter = f"version <= {self.config.to_commit_version}"
        max_commit_version = _get_commit_version(
            self._query(history_query),
            max_version_filter,
        )
        if min_commit_version is None or max_commit_version is None:
            raise RuntimeError(f"No valid versions found for Table [ '{self.table_identifier}' ].")

        # Handle cases based on metadata
        if metadata_df.isEmpty():
            # Case 1: No metadata found, read all versions (first delta load)
            self._console_logger.info("No CDF History for this identifier, reading all versions.")
            commit_tuple = (min_commit_version, max_commit_version)
            self._console_logger.info(f"Reading Versions: {commit_tuple}")
            return commit_tuple

        start_commit_row = metadata_df.agg(F.max("end_commit_version")).first()
        start_commit_version = start_commit_row[0] if start_commit_row is not None else None
        if start_commit_version is None:
            # Case 2: No processed version found in metadata, treat as no metadata
            self._console_logger.info("No processed version found in metadata, reading all versions.")
            commit_tuple = (min_commit_version, max_commit_version)
            self._console_logger.info(f"Reading Versions: {commit_tuple}")
            return commit_tuple

        if start_commit_version > max_commit_version:
            # Case 3: Last processed version in metadata is greater than last version in table history
            # This can happen if the table is recreated after the last processed version
            raise RuntimeError(
                f"Table ['{self.table_identifier}'] history and CDF metadata are incompatible. "
                "Either reset the CDF metadata and recreate the target table from scratch,"
                "or repair CDF metadata."
            )

        if min_commit_version > start_commit_version:
            # Case 4: First version in table history is greater than last processed version in metadata
            # This can happen if the table is truncated after the last processed version
            self._console_logger.info("The first version in Table history is greater than the last processed version.")
            commit_tuple = (min_commit_version, max_commit_version)
            self._console_logger.info(f"Reading Versions: {commit_tuple}")
            return commit_tuple

        # Case 5: Normal case, read from last processed version to last available version
        self._console_logger.info("Reading from the last processed version to the last available version.")
        commit_tuple = (start_commit_version, max_commit_version)
        self._console_logger.info(f"Reading Versions: {commit_tuple}")
        return commit_tuple

    def verify(self) -> None:
        """Verify that the source table has the Change Data Feed enabled."""
        self._console_logger.info("Verifying table is enabled for Change Data Feed.")
        if not self._check_cdf_enabled(self.table_identifier):
            raise RuntimeError(f"Table {self.table_identifier} is not enabled for Change Data Feed.")

    def _full_load(self, options: dict[str, str]) -> DataFrame:
        self._console_logger.info(f"Performing full load from source table: {self.table_identifier}")

        # Handle history queries for both catalog tables and file paths
        if self.table_identifier.count(".") == 2 and not self.table_identifier.startswith("/"):
            # Catalog table
            history_query = f"DESCRIBE HISTORY {self.table_identifier}"
        else:
            # File path - need to use delta.`path` format
            history_query = f"DESCRIBE HISTORY delta.`{self.table_identifier}`"

        max_version_query = self._query(history_query).selectExpr("max(version)").first()
        if not max_version_query or max_version_query[0] is None:
            raise RuntimeError(f"No valid versions found for Table [ '{self.table_identifier}' ].")

        # Add type validation before casting
        version_value = max_version_query[0]
        if not isinstance(version_value, (int | float)) or isinstance(version_value, bool):
            raise TypeError(f"Expected numeric version, got {type(version_value)}: {version_value}")

        start_version = 0
        end_version = int(version_value)
        start_commit_timestamp = None
        end_commit_timestamp = None

        self.table_reader.options(**options)

        # Handle table reading for both catalog tables and file paths
        if self.table_identifier.count(".") == 2 and not self.table_identifier.startswith("/"):
            # Catalog table
            df = self.table_reader.table(self.table_identifier)
        else:
            # File path - use load method
            df = self.table_reader.load(self.table_identifier)

        # Cache the DataFrame since it will be used for both counting and returning
        df.cache()
        row_count = df.count()

        self._create_metadata_entry(
            rows=row_count,
            last_read_timestamp=end_commit_timestamp,
            start_version=start_version,
            end_version=end_version,
            start_commit_timestamp=start_commit_timestamp,
            end_commit_timestamp=end_commit_timestamp,
        )

        # Note: We keep the DataFrame cached since it's returned to the caller
        # The caller is responsible for unpersisting when done
        return df

    def _delta_load(self, options: dict[str, str]) -> DataFrame:
        self._console_logger.info(f"Performing delta load from source table: {self.table_identifier}")
        start_version, end_version = self._get_commit_versions()

        self._invalidate_versions()

        if start_version != end_version:
            # Increment version by one to avoid reading the same version twice
            read_start_version = str(start_version + 1)
        else:
            read_start_version = str(start_version)

        self._console_logger.info(f"Reading commit versions: (from: {read_start_version}, to: {str(end_version)})")
        # Set CDF-specific options
        self.table_reader.option("readChangeFeed", "true")
        self.table_reader.option("startingVersion", read_start_version)
        self.table_reader.option("endingVersion", str(end_version))

        # Set additional options
        for key, value in options.items():
            self.table_reader.option(key, str(value))

        # Handle table reading for both catalog tables and file paths
        if self.table_identifier.count(".") == 2 and not self.table_identifier.startswith("/"):
            # Catalog table
            df = self.table_reader.table(self.table_identifier)
        else:
            # File path - use load method
            df = self.table_reader.load(self.table_identifier)

        df = df.filter("_change_type <> 'update_preimage'")

        # Cache the DataFrame as it will be used multiple times
        df.cache()

        # Optimize timestamp extraction by combining operations
        start_commit_timestamp = None
        end_commit_timestamp = None

        if start_version != end_version:
            # Combine both timestamp extractions into a single operation
            timestamp_df = (
                df.filter(F.col("_commit_version").isin([start_version, end_version]))
                .select("_commit_version", "_commit_timestamp")
                .collect()
            )

            timestamp_map = {row["_commit_version"]: row["_commit_timestamp"] for row in timestamp_df}
            start_commit_timestamp = timestamp_map.get(start_version)
            end_commit_timestamp = timestamp_map.get(end_version)

        # Handle case where start_version == end_version
        if start_version == end_version:
            df = df.limit(0)
            row_count = 0
        else:
            row_count = df.count()

        self._create_metadata_entry(
            rows=row_count,
            last_read_timestamp=end_commit_timestamp,
            start_version=start_version,
            end_version=end_version,
            start_commit_timestamp=start_commit_timestamp,
            end_commit_timestamp=end_commit_timestamp,
        )
        # Remove duplicates introduced by CDF. This happens if a row is changed
        # in multiple read versions. We are only interested in the latest
        # change.
        if self.config.deduplication_columns:
            key_columns = self.config.deduplication_columns
            key_column_names = [col.name if isinstance(col, Column) else col for col in key_columns]
            self._console_logger.info(f"Deduplicating with columns: {key_column_names}")
            window_spec = (
                Window.partitionBy(*key_column_names)
                .orderBy(F.desc("_commit_version"))
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)
            )

            row_number_col_name = generate_unique_column_name(existing_columns=set(df.columns), prefix="row_num")

            df = (
                df.withColumn(row_number_col_name, F.row_number().over(window_spec))
                .filter(F.col(row_number_col_name) == 1)
                .drop(row_number_col_name)
            )

        # Strip CDF metadata columns and unpersist the intermediate cache
        result_df = df.drop("_commit_version", "_commit_timestamp")

        # Unpersist the cached DataFrame to free memory
        df.unpersist()

        return result_df

    def read_data(
        self,
        options: dict[str, str] | None = None,
    ) -> DataFrame:
        """Reads data using the CDF strategy.

        Args:
            options: Additional DataFrameReader options.
        """
        self.verify()
        options = options or {}
        do_full_load = self.config.enable_full_load and not self._has_valid_metadata()

        if do_full_load:
            return self._full_load(options)

        return self._delta_load(options)

_check_cdf_enabled(table_identifier)

Checks if Change Data Feed is enabled for the table.

Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_cdf_loader.py
def _check_cdf_enabled(self, table_identifier: str) -> bool:
    """Checks if Change Data Feed is enabled for the table."""
    try:
        # Try catalog table approach first (for table names like catalog.schema.table)
        if table_identifier.count(".") == 2 and not table_identifier.startswith("/"):
            table_properties = self._query(f"SHOW TBLPROPERTIES {table_identifier}").collect()
            properties_dict = {row["key"]: row["value"] for row in table_properties}
            value = properties_dict.get("delta.enableChangeDataFeed", "false")
            return str(value).lower() == "true"
        # For file paths, use Delta Table API directly
        from delta import DeltaTable  # type: ignore[import-untyped]

        delta_table = DeltaTable.forPath(self._spark, table_identifier)
        properties = delta_table.detail().select("properties").collect()[0]["properties"]
        value = properties.get("delta.enableChangeDataFeed", "false") if properties else "false"
        return str(value).lower() == "true"
    except Exception:
        # If we can't determine CDF status, assume it's not enabled
        return False

_get_commit_versions()

Retrieves the starting and ending commit versions for CDF data.

Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_cdf_loader.py
def _get_commit_versions(self) -> tuple[int, int]:
    """Retrieves the starting and ending commit versions for CDF data."""

    def _get_metadata_df() -> DataFrame:
        df = self.table_reader.table(self.metadata_table_identifier)
        return df.filter(
            (F.col("source_table_identifier") == self.table_identifier)
            & (F.col("delta_load_identifier") == self.delta_load_identifier)
            & F.col("is_processed")
            & ~F.col("is_stale"),
        )

    def _get_commit_version(query: DataFrame, version_filter: str | None = None) -> int | None:
        if version_filter is not None:
            query = query.filter(version_filter)
        row = query.selectExpr("max(version)").first()
        if row is None or row[0] is None:
            return None
        # Add type validation before casting
        version_value = row[0]
        if not isinstance(version_value, (int | float)) or isinstance(version_value, bool):
            raise TypeError(f"Expected numeric version, got {type(version_value)}: {version_value}")
        return int(version_value)

    metadata_df = _get_metadata_df()
    self._console_logger.info("Querying table history to find minimum version.")
    min_version_filter = None
    if self.config.from_commit_version is not None:
        min_version_filter = f"version >= {self.config.from_commit_version}"
    # Handle history queries for both catalog tables and file paths
    if self.table_identifier.count(".") == 2 and not self.table_identifier.startswith("/"):
        # Catalog table
        history_query = f"DESCRIBE HISTORY {self.table_identifier}"
    else:
        # File path - need to use delta.`path` format
        history_query = f"DESCRIBE HISTORY delta.`{self.table_identifier}`"

    min_commit_version = _get_commit_version(
        self._query(history_query).filter(
            "operation like 'CREATE%' OR operation = 'TRUNCATE' OR operationParameters.properties like '%delta.enableChangeDataFeed%' "
        ),
        min_version_filter,
    )
    if min_commit_version is None:
        min_commit_version = 0

    max_version_filter = None
    if self.config.to_commit_version is not None:
        max_version_filter = f"version <= {self.config.to_commit_version}"
    max_commit_version = _get_commit_version(
        self._query(history_query),
        max_version_filter,
    )
    if min_commit_version is None or max_commit_version is None:
        raise RuntimeError(f"No valid versions found for Table [ '{self.table_identifier}' ].")

    # Handle cases based on metadata
    if metadata_df.isEmpty():
        # Case 1: No metadata found, read all versions (first delta load)
        self._console_logger.info("No CDF History for this identifier, reading all versions.")
        commit_tuple = (min_commit_version, max_commit_version)
        self._console_logger.info(f"Reading Versions: {commit_tuple}")
        return commit_tuple

    start_commit_row = metadata_df.agg(F.max("end_commit_version")).first()
    start_commit_version = start_commit_row[0] if start_commit_row is not None else None
    if start_commit_version is None:
        # Case 2: No processed version found in metadata, treat as no metadata
        self._console_logger.info("No processed version found in metadata, reading all versions.")
        commit_tuple = (min_commit_version, max_commit_version)
        self._console_logger.info(f"Reading Versions: {commit_tuple}")
        return commit_tuple

    if start_commit_version > max_commit_version:
        # Case 3: Last processed version in metadata is greater than last version in table history
        # This can happen if the table is recreated after the last processed version
        raise RuntimeError(
            f"Table ['{self.table_identifier}'] history and CDF metadata are incompatible. "
            "Either reset the CDF metadata and recreate the target table from scratch,"
            "or repair CDF metadata."
        )

    if min_commit_version > start_commit_version:
        # Case 4: First version in table history is greater than last processed version in metadata
        # This can happen if the table is truncated after the last processed version
        self._console_logger.info("The first version in Table history is greater than the last processed version.")
        commit_tuple = (min_commit_version, max_commit_version)
        self._console_logger.info(f"Reading Versions: {commit_tuple}")
        return commit_tuple

    # Case 5: Normal case, read from last processed version to last available version
    self._console_logger.info("Reading from the last processed version to the last available version.")
    commit_tuple = (start_commit_version, max_commit_version)
    self._console_logger.info(f"Reading Versions: {commit_tuple}")
    return commit_tuple

_has_valid_metadata()

Checks if valid (i.e. non-stale) metadata exists for the delta load.

Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_cdf_loader.py
def _has_valid_metadata(self) -> bool:
    """Checks if valid (i.e. non-stale) metadata exists for the delta load."""
    try:
        df = self._spark.sql(f"""
            SELECT * FROM {self.metadata_table_identifier}
            WHERE source_table_identifier = '{self.table_identifier}'
            AND delta_load_identifier = '{self.delta_load_identifier}'
            AND is_processed = true
            AND is_stale = false
        """)
        return not df.isEmpty()
    except Exception as e:
        self._console_logger.warning(f"Error accessing metadata table: {e}")
        return False

read_data(options=None)

Reads data using the CDF strategy.

Parameters:

Name Type Description Default
options dict[str, str] | None

Additional DataFrameReader options.

None
Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_cdf_loader.py
def read_data(
    self,
    options: dict[str, str] | None = None,
) -> DataFrame:
    """Reads data using the CDF strategy.

    Args:
        options: Additional DataFrameReader options.
    """
    self.verify()
    options = options or {}
    do_full_load = self.config.enable_full_load and not self._has_valid_metadata()

    if do_full_load:
        return self._full_load(options)

    return self._delta_load(options)

verify()

Verify that the source table has the Change Data Feed enabled.

Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_cdf_loader.py
def verify(self) -> None:
    """Verify that the source table has the Change Data Feed enabled."""
    self._console_logger.info("Verifying table is enabled for Change Data Feed.")
    if not self._check_cdf_enabled(self.table_identifier):
        raise RuntimeError(f"Table {self.table_identifier} is not enabled for Change Data Feed.")

DeltaLoadOptions

Bases: BaseModel

Options to configure the DeltaLoader.

Parameters:

Name Type Description Default
strategy

Delta load strategy to use.

required
delta_load_identifier

Unique delta load identifier used to track the delta load metadata.

required
strategy_options

Options used to configure the chosen delta load strategy. See the config class of the particular strategy for more info.

required
metadata_table_identifier

Identifier of the metadata table used to keep track of the delta load metadata. The table will be created if it does not exist. If none, it will default to <source_catalog>.<source_schema>.metadata_delta_load.

required
Source code in src/cloe_nessy/integration/delta_loader/delta_load_options.py
class DeltaLoadOptions(BaseModel):
    """Options to configure the DeltaLoader.

    Args:
        strategy: Delta load strategy to use.
        delta_load_identifier: Unique delta load identifier used to track the delta load metadata.
        strategy_options: Options used to configure the chosen delta load strategy.
            See the config class of the particular strategy for more info.
        metadata_table_identifier: Identifier of the metadata table used to keep
            track of the delta load metadata. The table will be created if it does
            not exist. If none, it will default to `<source_catalog>.<source_schema>.metadata_delta_load`.
    """

    strategy: str
    delta_load_identifier: str
    strategy_options: dict
    metadata_table_identifier: str | None = None

    @classmethod
    def from_yaml_str(cls, yaml_str: str) -> Self:
        """Creates an instance of DeltaLoadOptions from a YAML string."""
        options = yaml.safe_load(yaml_str)
        return cls(**options)

    @classmethod
    def from_file(cls, path: str | Path) -> Self:
        """Creates an instance of DeltaLoadOptions from a YAML file."""
        with Path(path).open() as f:
            yaml_str = f.read()
        return cls.from_yaml_str(yaml_str)

from_file(path) classmethod

Creates an instance of DeltaLoadOptions from a YAML file.

Source code in src/cloe_nessy/integration/delta_loader/delta_load_options.py
@classmethod
def from_file(cls, path: str | Path) -> Self:
    """Creates an instance of DeltaLoadOptions from a YAML file."""
    with Path(path).open() as f:
        yaml_str = f.read()
    return cls.from_yaml_str(yaml_str)

from_yaml_str(yaml_str) classmethod

Creates an instance of DeltaLoadOptions from a YAML string.

Source code in src/cloe_nessy/integration/delta_loader/delta_load_options.py
@classmethod
def from_yaml_str(cls, yaml_str: str) -> Self:
    """Creates an instance of DeltaLoadOptions from a YAML string."""
    options = yaml.safe_load(yaml_str)
    return cls(**options)

DeltaLoader

Bases: ABC, LoggerMixin

Base class for delta load operations.

Parameters:

Name Type Description Default
table_identifier str

Identifier for the table to be loaded.

required
delta_load_identifier str

Identifier for the delta load.

required
metadata_table_identifier str | None

Identifier for the metadata table. If None, the metadata_table_identifier will be derived from the table identifier: <table_catalog>.<table_schema>.metadata_delta_load.

None
Source code in src/cloe_nessy/integration/delta_loader/delta_loader.py
class DeltaLoader(ABC, LoggerMixin):
    """Base class for delta load operations.

    Args:
        table_identifier: Identifier for the table to be loaded.
        delta_load_identifier: Identifier for the delta load.
        metadata_table_identifier: Identifier for the metadata table. If None,
            the metadata_table_identifier will be derived from the table identifier:
            `<table_catalog>.<table_schema>.metadata_delta_load`.
    """

    def __init__(
        self,
        table_identifier: str,
        delta_load_identifier: str,
        metadata_table_identifier: str | None = None,
    ):
        self._spark = SessionManager.get_spark_session()
        self._console_logger = self.get_console_logger()
        self.table_identifier = table_identifier
        self.delta_load_identifier = delta_load_identifier
        self.metadata_table_identifier = (
            metadata_table_identifier
            or f"{self.table_identifier.split('.')[0]}.{self.table_identifier.split('.')[1]}.metadata_delta_load"
        )
        table_manager = TableManager()
        table_manager.create_table(table=DeltaLoaderMetadataTable(identifier=self.metadata_table_identifier))

    @abstractmethod
    def read_data(
        self,
        options: dict[str, str] | None = None,
    ) -> DataFrame:
        """Reads data incrementally using a strategy.

        Args:
            options: Additional DataFrameReader options.
        """
        pass

    @abstractmethod
    def verify(self) -> None:
        """Verify that the source table qualifies for the delta load strategy."""
        pass

    def _query(self, query: str) -> DataFrame:
        df = self._spark.sql(query)
        return df

    def _create_metadata_entry(
        self,
        *,
        rows: int,
        last_read_timestamp: datetime | None = None,
        start_version: int | None = None,
        end_version: int | None = None,
        start_commit_timestamp: datetime | None = None,
        end_commit_timestamp: datetime | None = None,
    ) -> None:
        """Creates an entry in the metadata table for the delta load."""
        self._console_logger.info(
            f"Creating metadata entry for table: [ {self.table_identifier} ] with Delta Load Identifier: [ {self.delta_load_identifier} ]",
        )
        metadata_df = self._spark.range(1)
        metadata_df = metadata_df.select(
            F.lit(rows).alias("rows").cast("bigint"),
            F.lit(False).alias("is_processed"),
            F.lit(False).alias("is_stale"),
            F.lit(self.table_identifier).alias("source_table_identifier"),
            F.lit(self.delta_load_identifier).alias("delta_load_identifier"),
            F.lit(start_version).alias("start_commit_version"),
            F.lit(end_version).alias("end_commit_version"),
            F.lit(start_commit_timestamp).alias("start_commit_timestamp_utc"),
            F.lit(end_commit_timestamp).alias("end_commit_timestamp_utc"),
            F.lit(last_read_timestamp).alias("last_read_timestamp"),
            F.current_timestamp().alias("__DCR"),
            F.current_timestamp().alias("__DMR"),
        ).withColumn(
            "BK",
            F.md5(
                F.concat_ws(
                    "-",
                    F.col("source_table_identifier"),
                    F.col("delta_load_identifier"),
                    F.current_timestamp(),
                ),
            ),
        )
        catalog_writer = CatalogWriter()
        catalog_writer.write_table(
            df=metadata_df,
            table_identifier=self.metadata_table_identifier,
            mode="append",
        )

    def _invalidate_versions(self) -> None:
        """Invalidate any pending changes in the metadata for the delta load."""
        self._console_logger.info(
            f"Invalidating unprocessed delta load metadata for table: [ {self.table_identifier} ] with Delta Load Identifier: [ {self.delta_load_identifier} ]",
        )
        delta_table = DeltaTable.forName(self._spark, self.metadata_table_identifier)
        delta_table.update(
            condition=(F.col("source_table_identifier") == self.table_identifier)
            & (F.col("delta_load_identifier") == self.delta_load_identifier)
            & ~F.col("is_processed")
            & ~F.col("is_stale"),
            set={"is_stale": F.lit(True), "__DMR": F.current_timestamp()},
        )

    def reset_cdf(self) -> None:
        """Invalidates all changes in the metadata for the delta load."""
        delta_table = DeltaTable.forName(self._spark, self.metadata_table_identifier)
        self._console_logger.info(
            f"Resetting delta load metadata for table: [ {self.table_identifier} ] with Delta Load Identifier: [ {self.delta_load_identifier} ]",
        )
        delta_table.update(
            condition=(F.col("source_table_identifier") == self.table_identifier)
            & (F.col("delta_load_identifier") == self.delta_load_identifier)
            & ~F.col("is_stale"),
            set={"is_stale": F.lit(True), "__DMR": F.current_timestamp()},
        )

    def consume_data(self) -> None:
        """Marks data as consumed in the metadata for the delta load."""
        df = self._spark.table(self.metadata_table_identifier)
        df = df.filter(
            (F.col("source_table_identifier") == self.table_identifier)
            & (F.col("delta_load_identifier") == self.delta_load_identifier)
            & ~F.col("is_processed")
            & ~F.col("is_stale"),
        )
        df = df.groupBy("BK", "delta_load_identifier").agg(F.max("__DCR")).limit(1)
        self._console_logger.info(
            f"Mark metadata for table as processed: [ {self.table_identifier} ] with Delta Load Identifier: [ {self.delta_load_identifier} ].",
        )
        delta_table = DeltaTable.forName(self._spark, self.metadata_table_identifier)
        delta_table.alias("target").merge(df.alias("source"), "target.BK = source.BK").whenMatchedUpdate(
            set={
                "is_processed": F.lit(True),
                "__DMR": F.current_timestamp(),
            },
        ).execute()

    def write_data(self, write_callable: partial):
        """Wrapper to write and consume a delta load."""
        try:
            write_callable()
        except Exception as e:
            raise RuntimeError("Error while writing...") from e
        self.consume_data()

_create_metadata_entry(*, rows, last_read_timestamp=None, start_version=None, end_version=None, start_commit_timestamp=None, end_commit_timestamp=None)

Creates an entry in the metadata table for the delta load.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader.py
def _create_metadata_entry(
    self,
    *,
    rows: int,
    last_read_timestamp: datetime | None = None,
    start_version: int | None = None,
    end_version: int | None = None,
    start_commit_timestamp: datetime | None = None,
    end_commit_timestamp: datetime | None = None,
) -> None:
    """Creates an entry in the metadata table for the delta load."""
    self._console_logger.info(
        f"Creating metadata entry for table: [ {self.table_identifier} ] with Delta Load Identifier: [ {self.delta_load_identifier} ]",
    )
    metadata_df = self._spark.range(1)
    metadata_df = metadata_df.select(
        F.lit(rows).alias("rows").cast("bigint"),
        F.lit(False).alias("is_processed"),
        F.lit(False).alias("is_stale"),
        F.lit(self.table_identifier).alias("source_table_identifier"),
        F.lit(self.delta_load_identifier).alias("delta_load_identifier"),
        F.lit(start_version).alias("start_commit_version"),
        F.lit(end_version).alias("end_commit_version"),
        F.lit(start_commit_timestamp).alias("start_commit_timestamp_utc"),
        F.lit(end_commit_timestamp).alias("end_commit_timestamp_utc"),
        F.lit(last_read_timestamp).alias("last_read_timestamp"),
        F.current_timestamp().alias("__DCR"),
        F.current_timestamp().alias("__DMR"),
    ).withColumn(
        "BK",
        F.md5(
            F.concat_ws(
                "-",
                F.col("source_table_identifier"),
                F.col("delta_load_identifier"),
                F.current_timestamp(),
            ),
        ),
    )
    catalog_writer = CatalogWriter()
    catalog_writer.write_table(
        df=metadata_df,
        table_identifier=self.metadata_table_identifier,
        mode="append",
    )

_invalidate_versions()

Invalidate any pending changes in the metadata for the delta load.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader.py
def _invalidate_versions(self) -> None:
    """Invalidate any pending changes in the metadata for the delta load."""
    self._console_logger.info(
        f"Invalidating unprocessed delta load metadata for table: [ {self.table_identifier} ] with Delta Load Identifier: [ {self.delta_load_identifier} ]",
    )
    delta_table = DeltaTable.forName(self._spark, self.metadata_table_identifier)
    delta_table.update(
        condition=(F.col("source_table_identifier") == self.table_identifier)
        & (F.col("delta_load_identifier") == self.delta_load_identifier)
        & ~F.col("is_processed")
        & ~F.col("is_stale"),
        set={"is_stale": F.lit(True), "__DMR": F.current_timestamp()},
    )

consume_data()

Marks data as consumed in the metadata for the delta load.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader.py
def consume_data(self) -> None:
    """Marks data as consumed in the metadata for the delta load."""
    df = self._spark.table(self.metadata_table_identifier)
    df = df.filter(
        (F.col("source_table_identifier") == self.table_identifier)
        & (F.col("delta_load_identifier") == self.delta_load_identifier)
        & ~F.col("is_processed")
        & ~F.col("is_stale"),
    )
    df = df.groupBy("BK", "delta_load_identifier").agg(F.max("__DCR")).limit(1)
    self._console_logger.info(
        f"Mark metadata for table as processed: [ {self.table_identifier} ] with Delta Load Identifier: [ {self.delta_load_identifier} ].",
    )
    delta_table = DeltaTable.forName(self._spark, self.metadata_table_identifier)
    delta_table.alias("target").merge(df.alias("source"), "target.BK = source.BK").whenMatchedUpdate(
        set={
            "is_processed": F.lit(True),
            "__DMR": F.current_timestamp(),
        },
    ).execute()

read_data(options=None) abstractmethod

Reads data incrementally using a strategy.

Parameters:

Name Type Description Default
options dict[str, str] | None

Additional DataFrameReader options.

None
Source code in src/cloe_nessy/integration/delta_loader/delta_loader.py
@abstractmethod
def read_data(
    self,
    options: dict[str, str] | None = None,
) -> DataFrame:
    """Reads data incrementally using a strategy.

    Args:
        options: Additional DataFrameReader options.
    """
    pass

reset_cdf()

Invalidates all changes in the metadata for the delta load.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader.py
def reset_cdf(self) -> None:
    """Invalidates all changes in the metadata for the delta load."""
    delta_table = DeltaTable.forName(self._spark, self.metadata_table_identifier)
    self._console_logger.info(
        f"Resetting delta load metadata for table: [ {self.table_identifier} ] with Delta Load Identifier: [ {self.delta_load_identifier} ]",
    )
    delta_table.update(
        condition=(F.col("source_table_identifier") == self.table_identifier)
        & (F.col("delta_load_identifier") == self.delta_load_identifier)
        & ~F.col("is_stale"),
        set={"is_stale": F.lit(True), "__DMR": F.current_timestamp()},
    )

verify() abstractmethod

Verify that the source table qualifies for the delta load strategy.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader.py
@abstractmethod
def verify(self) -> None:
    """Verify that the source table qualifies for the delta load strategy."""
    pass

write_data(write_callable)

Wrapper to write and consume a delta load.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader.py
def write_data(self, write_callable: partial):
    """Wrapper to write and consume a delta load."""
    try:
        write_callable()
    except Exception as e:
        raise RuntimeError("Error while writing...") from e
    self.consume_data()

DeltaLoaderFactory

Factory to create a DeltaLoader instance based on the DeltaLoadOptions.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader_factory.py
class DeltaLoaderFactory:
    """Factory to create a DeltaLoader instance based on the DeltaLoadOptions."""

    @staticmethod
    def create_loader(table_identifier: str, options: DeltaLoadOptions) -> DeltaLoader:
        """Creates an instance of DeltaLoader, choosing the desired strategy."""
        if options.strategy.upper() == "CDF":
            cdf_config = DeltaCDFConfig(**options.strategy_options)
            return DeltaCDFLoader(
                table_identifier=table_identifier,
                delta_load_identifier=options.delta_load_identifier,
                config=cdf_config,
                metadata_table_identifier=options.metadata_table_identifier,
            )
        if options.strategy.upper() == "TIMESTAMP":
            timestamp_config = DeltaTimestampConfig(**options.strategy_options)
            return DeltaTimestampLoader(
                table_identifier=table_identifier,
                delta_load_identifier=options.delta_load_identifier,
                config=timestamp_config,
                metadata_table_identifier=options.metadata_table_identifier,
            )
        raise ValueError(f"Unknown strategy: {options.strategy}")

create_loader(table_identifier, options) staticmethod

Creates an instance of DeltaLoader, choosing the desired strategy.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader_factory.py
@staticmethod
def create_loader(table_identifier: str, options: DeltaLoadOptions) -> DeltaLoader:
    """Creates an instance of DeltaLoader, choosing the desired strategy."""
    if options.strategy.upper() == "CDF":
        cdf_config = DeltaCDFConfig(**options.strategy_options)
        return DeltaCDFLoader(
            table_identifier=table_identifier,
            delta_load_identifier=options.delta_load_identifier,
            config=cdf_config,
            metadata_table_identifier=options.metadata_table_identifier,
        )
    if options.strategy.upper() == "TIMESTAMP":
        timestamp_config = DeltaTimestampConfig(**options.strategy_options)
        return DeltaTimestampLoader(
            table_identifier=table_identifier,
            delta_load_identifier=options.delta_load_identifier,
            config=timestamp_config,
            metadata_table_identifier=options.metadata_table_identifier,
        )
    raise ValueError(f"Unknown strategy: {options.strategy}")

DeltaTimestampConfig

Bases: BaseModel

This class holds the config for the DeltaTimestampLoader.

Parameters:

Name Type Description Default
timestamp_filter_cols

A list of columns used for timestamp filtering.

required
from_timestamp

The starting timestamp. If None, it starts from the beginning.

required
to_timestamp

The ending timestamp. If None, it goes up to the latest timestamp.

required
filter_method

The method used for filtering when multiple timestamp columns are used. Allowed values are '||', '&&', 'OR', 'AND'. Defaults to None.

required
Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_timestamp_loader.py
class DeltaTimestampConfig(BaseModel):
    """This class holds the config for the DeltaTimestampLoader.

    Args:
        timestamp_filter_cols: A list of columns used for timestamp filtering.
        from_timestamp: The starting timestamp. If None, it starts from the beginning.
        to_timestamp: The ending timestamp. If None, it goes up to the latest timestamp.
        filter_method: The method used for filtering when multiple timestamp
            columns are used. Allowed values are '||', '&&', 'OR', 'AND'. Defaults
            to None.
    """

    timestamp_filter_cols: list[str | Column]
    from_timestamp: datetime | None = Field(default=None)
    to_timestamp: datetime | None = Field(default=None)
    filter_method: str | None = Field(default=None)

    @field_validator("from_timestamp", "to_timestamp", mode="before")
    @classmethod
    def parse_datetime(cls, value):
        """Parses datetime input.

        If a string is parsed, it is expected to be in ISO 8601 format.
        """
        if isinstance(value, str):
            return datetime.fromisoformat(value)
        return value

    @field_validator("filter_method", mode="before")
    @classmethod
    def parse_filter_method(cls, value):
        """Parses and validates filter_method input."""
        value = value.upper()
        match value:
            case "OR":
                value = "||"
            case "AND":
                value = "&&"
            case "||" | "&&":
                # Valid filter methods, do nothing
                pass
            case _:
                raise ValueError("Invalid filter method. Allowed values are '||', '&&', 'OR', 'AND'.")
        return value

    @model_validator(mode="after")
    def check_filter_method(self):
        """Validates that a filter method is set, when more than one timestamp col is used."""
        if len(self.timestamp_filter_cols) > 1 and self.filter_method is None:
            raise ValueError("filter_method must be set when more than one timestamp_filter_cols is used.")
        return self

check_filter_method()

Validates that a filter method is set, when more than one timestamp col is used.

Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_timestamp_loader.py
@model_validator(mode="after")
def check_filter_method(self):
    """Validates that a filter method is set, when more than one timestamp col is used."""
    if len(self.timestamp_filter_cols) > 1 and self.filter_method is None:
        raise ValueError("filter_method must be set when more than one timestamp_filter_cols is used.")
    return self

parse_datetime(value) classmethod

Parses datetime input.

If a string is parsed, it is expected to be in ISO 8601 format.

Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_timestamp_loader.py
@field_validator("from_timestamp", "to_timestamp", mode="before")
@classmethod
def parse_datetime(cls, value):
    """Parses datetime input.

    If a string is parsed, it is expected to be in ISO 8601 format.
    """
    if isinstance(value, str):
        return datetime.fromisoformat(value)
    return value

parse_filter_method(value) classmethod

Parses and validates filter_method input.

Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_timestamp_loader.py
@field_validator("filter_method", mode="before")
@classmethod
def parse_filter_method(cls, value):
    """Parses and validates filter_method input."""
    value = value.upper()
    match value:
        case "OR":
            value = "||"
        case "AND":
            value = "&&"
        case "||" | "&&":
            # Valid filter methods, do nothing
            pass
        case _:
            raise ValueError("Invalid filter method. Allowed values are '||', '&&', 'OR', 'AND'.")
    return value

DeltaTimestampLoader

Bases: DeltaLoader

Implementation of the DeltaLoader interface using timestamp strategy.

Parameters:

Name Type Description Default
config DeltaTimestampConfig

Configuration for the DeltaTimestampLoader.

required
table_identifier str

Identifier for the table to be loaded.

required
delta_load_identifier str

Identifier for the delta load.

required
metadata_table_identifier str | None

Identifier for the metadata table. Defaults to None.

None
Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_timestamp_loader.py
class DeltaTimestampLoader(DeltaLoader):
    """Implementation of the DeltaLoader interface using timestamp strategy.

    Args:
        config: Configuration for the DeltaTimestampLoader.
        table_identifier: Identifier for the table to be loaded.
        delta_load_identifier: Identifier for the delta load.
        metadata_table_identifier: Identifier for the metadata table. Defaults to None.
    """

    def __init__(
        self,
        config: DeltaTimestampConfig,
        table_identifier: str,
        delta_load_identifier: str,
        metadata_table_identifier: str | None = None,
    ):
        super().__init__(
            table_identifier,
            delta_load_identifier,
            metadata_table_identifier,
        )
        self.config = config
        self.table_reader = self._spark.read
        self.catalog_writer = CatalogWriter()

    def _get_last_timestamp(self) -> datetime:
        """Retrieves last read timestamp for delta load."""
        self._console_logger.info(f"Fetchin last read timestamp for table [ '{self.table_identifier}' ].")
        df = self.table_reader.table(self.metadata_table_identifier)
        row = (
            df.filter(
                (F.col("source_table_identifier") == self.table_identifier)
                & (F.col("delta_load_identifier") == self.delta_load_identifier)
                & F.col("is_processed")
                & ~F.col("is_stale"),
            )
            .agg(F.max("last_read_timestamp"))
            .first()
        )
        last_timestamp = row[0] if row is not None else None
        if last_timestamp is None:
            return datetime.fromtimestamp(0)
        return cast(datetime, last_timestamp)

    def verify(self) -> None:
        """Verify that the source table has the Change Data Feed enabled."""
        self._console_logger.info("Verifying that table has all configured timestamp columns.")
        df = self._spark.read.table(self.table_identifier)
        missing_columns = [col for col in self.config.timestamp_filter_cols if col not in df.columns]
        if missing_columns:
            raise RuntimeError(
                f"Timestamp filter Columns not found in Table {self.table_identifier} : {', '.join(str(col) for col in missing_columns)}.",
            )

    def read_data(
        self,
        options: dict[str, str] | None = None,
    ) -> DataFrame:
        """Reads data using the Timestamp strategy.

        Args:
            options: Additional DataFrameReader options.
        """
        if options is None:
            options = {}

        last_read_timestamp = self.config.to_timestamp or datetime.now(UTC)

        from_timestamp = self._get_last_timestamp()
        if self.config.from_timestamp and self.config.from_timestamp > from_timestamp:
            from_timestamp = self.config.from_timestamp
        self._invalidate_versions()

        self.table_reader.options(**options)
        df = self.table_reader.table(self.table_identifier)
        if from_timestamp != datetime.fromtimestamp(0):
            df = df.filter(
                f" {self.config.filter_method} ".join(
                    [f"{col} >= '{from_timestamp.isoformat()}'" for col in self.config.timestamp_filter_cols],
                ),
            )
        if last_read_timestamp == from_timestamp:
            # to avoid reading multiple times
            df = df.limit(0)
        else:
            df = df.filter(
                f" {self.config.filter_method} ".join(
                    [f"{col} < '{last_read_timestamp.isoformat()}'" for col in self.config.timestamp_filter_cols],
                ),
            )

        self._create_metadata_entry(
            rows=df.count(),
            last_read_timestamp=last_read_timestamp,
        )

        return df

_get_last_timestamp()

Retrieves last read timestamp for delta load.

Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_timestamp_loader.py
def _get_last_timestamp(self) -> datetime:
    """Retrieves last read timestamp for delta load."""
    self._console_logger.info(f"Fetchin last read timestamp for table [ '{self.table_identifier}' ].")
    df = self.table_reader.table(self.metadata_table_identifier)
    row = (
        df.filter(
            (F.col("source_table_identifier") == self.table_identifier)
            & (F.col("delta_load_identifier") == self.delta_load_identifier)
            & F.col("is_processed")
            & ~F.col("is_stale"),
        )
        .agg(F.max("last_read_timestamp"))
        .first()
    )
    last_timestamp = row[0] if row is not None else None
    if last_timestamp is None:
        return datetime.fromtimestamp(0)
    return cast(datetime, last_timestamp)

read_data(options=None)

Reads data using the Timestamp strategy.

Parameters:

Name Type Description Default
options dict[str, str] | None

Additional DataFrameReader options.

None
Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_timestamp_loader.py
def read_data(
    self,
    options: dict[str, str] | None = None,
) -> DataFrame:
    """Reads data using the Timestamp strategy.

    Args:
        options: Additional DataFrameReader options.
    """
    if options is None:
        options = {}

    last_read_timestamp = self.config.to_timestamp or datetime.now(UTC)

    from_timestamp = self._get_last_timestamp()
    if self.config.from_timestamp and self.config.from_timestamp > from_timestamp:
        from_timestamp = self.config.from_timestamp
    self._invalidate_versions()

    self.table_reader.options(**options)
    df = self.table_reader.table(self.table_identifier)
    if from_timestamp != datetime.fromtimestamp(0):
        df = df.filter(
            f" {self.config.filter_method} ".join(
                [f"{col} >= '{from_timestamp.isoformat()}'" for col in self.config.timestamp_filter_cols],
            ),
        )
    if last_read_timestamp == from_timestamp:
        # to avoid reading multiple times
        df = df.limit(0)
    else:
        df = df.filter(
            f" {self.config.filter_method} ".join(
                [f"{col} < '{last_read_timestamp.isoformat()}'" for col in self.config.timestamp_filter_cols],
            ),
        )

    self._create_metadata_entry(
        rows=df.count(),
        last_read_timestamp=last_read_timestamp,
    )

    return df

verify()

Verify that the source table has the Change Data Feed enabled.

Source code in src/cloe_nessy/integration/delta_loader/strategies/delta_timestamp_loader.py
def verify(self) -> None:
    """Verify that the source table has the Change Data Feed enabled."""
    self._console_logger.info("Verifying that table has all configured timestamp columns.")
    df = self._spark.read.table(self.table_identifier)
    missing_columns = [col for col in self.config.timestamp_filter_cols if col not in df.columns]
    if missing_columns:
        raise RuntimeError(
            f"Timestamp filter Columns not found in Table {self.table_identifier} : {', '.join(str(col) for col in missing_columns)}.",
        )

consume_delta_load(runtime_info, delta_load_identifier=None)

Consumes a delta load by updating the metadata table.

Parameters:

Name Type Description Default
runtime_info dict[str, Any]

Runtime information.

required
delta_load_identifier str | None

If set, the ConsumeDeltaLoadAction action will only consume DeltaLoader transaction for the given delta_load_identifier.

None
Source code in src/cloe_nessy/integration/delta_loader/delta_loader_factory.py
def consume_delta_load(
    runtime_info: dict[str, Any],
    delta_load_identifier: str | None = None,
) -> None:
    """Consumes a delta load by updating the metadata table.

    Args:
        runtime_info: Runtime information.
        delta_load_identifier: If set, the ConsumeDeltaLoadAction action
            will only consume DeltaLoader transaction for the given
            delta_load_identifier.
    """
    for table_name, value in runtime_info["delta_load_options"].items():
        if delta_load_identifier is None or delta_load_identifier == value.get("delta_load_identifier"):
            delta_loader: DeltaLoader = DeltaLoaderFactory.create_loader(
                table_identifier=table_name,
                options=DeltaLoadOptions(
                    **value,
                ),
            )
            delta_loader.consume_data()