Skip to content

delta_loader_factory

DeltaLoaderFactory

Factory to create a DeltaLoader instance based on the DeltaLoadOptions.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader_factory.py
class DeltaLoaderFactory:
    """Factory to create a DeltaLoader instance based on the DeltaLoadOptions."""

    @staticmethod
    def create_loader(table_identifier: str, options: DeltaLoadOptions) -> DeltaLoader:
        """Creates an instance of DeltaLoader, choosing the desired strategy."""
        if options.strategy.upper() == "CDF":
            cdf_config = DeltaCDFConfig(**options.strategy_options)
            return DeltaCDFLoader(
                table_identifier=table_identifier,
                delta_load_identifier=options.delta_load_identifier,
                config=cdf_config,
                metadata_table_identifier=options.metadata_table_identifier,
            )
        if options.strategy.upper() == "TIMESTAMP":
            timestamp_config = DeltaTimestampConfig(**options.strategy_options)
            return DeltaTimestampLoader(
                table_identifier=table_identifier,
                delta_load_identifier=options.delta_load_identifier,
                config=timestamp_config,
                metadata_table_identifier=options.metadata_table_identifier,
            )
        raise ValueError(f"Unknown strategy: {options.strategy}")

create_loader(table_identifier, options) staticmethod

Creates an instance of DeltaLoader, choosing the desired strategy.

Source code in src/cloe_nessy/integration/delta_loader/delta_loader_factory.py
@staticmethod
def create_loader(table_identifier: str, options: DeltaLoadOptions) -> DeltaLoader:
    """Creates an instance of DeltaLoader, choosing the desired strategy."""
    if options.strategy.upper() == "CDF":
        cdf_config = DeltaCDFConfig(**options.strategy_options)
        return DeltaCDFLoader(
            table_identifier=table_identifier,
            delta_load_identifier=options.delta_load_identifier,
            config=cdf_config,
            metadata_table_identifier=options.metadata_table_identifier,
        )
    if options.strategy.upper() == "TIMESTAMP":
        timestamp_config = DeltaTimestampConfig(**options.strategy_options)
        return DeltaTimestampLoader(
            table_identifier=table_identifier,
            delta_load_identifier=options.delta_load_identifier,
            config=timestamp_config,
            metadata_table_identifier=options.metadata_table_identifier,
        )
    raise ValueError(f"Unknown strategy: {options.strategy}")

consume_delta_load(runtime_info, delta_load_identifier=None)

Consumes a delta load by updating the metadata table.

Parameters:

Name Type Description Default
runtime_info dict[str, Any]

Runtime information.

required
delta_load_identifier str | None

If set, the ConsumeDeltaLoadAction action will only consume DeltaLoader transaction for the given delta_load_identifier.

None
Source code in src/cloe_nessy/integration/delta_loader/delta_loader_factory.py
def consume_delta_load(
    runtime_info: dict[str, Any],
    delta_load_identifier: str | None = None,
) -> None:
    """Consumes a delta load by updating the metadata table.

    Args:
        runtime_info: Runtime information.
        delta_load_identifier: If set, the ConsumeDeltaLoadAction action
            will only consume DeltaLoader transaction for the given
            delta_load_identifier.
    """
    for table_name, value in runtime_info["delta_load_options"].items():
        if delta_load_identifier is None or delta_load_identifier == value.get("delta_load_identifier"):
            delta_loader: DeltaLoader = DeltaLoaderFactory.create_loader(
                table_identifier=table_name,
                options=DeltaLoadOptions(
                    **value,
                ),
            )
            delta_loader.consume_data()