Skip to content

read_catalog_table

ReadCatalogTableAction

Bases: PipelineAction

Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

This function retrieves data from a catalog table using the CatalogReader identified by either the table_identifier parameter or the table_metadata from the provided PipelineContext of a previous step. The retrieved data is loaded into a DataFrame and returned as part of an updated PipelineContext.

Example
Read Sales Table:
    action: READ_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        options: <options for the CatalogReader read method>
    delta_load_options:
        strategy: CDF
        delta_load_identifier: my_delta_load_id
        strategy_options:
            deduplication_columns: ["id"]
            enable_full_load: true
Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
class ReadCatalogTableAction(PipelineAction):
    """Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

    This function retrieves data from a catalog table using the
    [`CatalogReader`][cloe_nessy.integration.reader.catalog_reader] identified
    by either the `table_identifier` parameter or the `table_metadata` from the
    provided `PipelineContext` of a previous step. The retrieved data is loaded
    into a DataFrame and returned as part of an updated `PipelineContext`.

    Example:
        ```yaml
        Read Sales Table:
            action: READ_CATALOG_TABLE
            options:
                table_identifier: my_catalog.business_schema.sales_table
                options: <options for the CatalogReader read method>
            delta_load_options:
                strategy: CDF
                delta_load_identifier: my_delta_load_id
                strategy_options:
                    deduplication_columns: ["id"]
                    enable_full_load: true
        ```
    """

    name: str = "READ_CATALOG_TABLE"

    @staticmethod
    def run(
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        options: dict[str, str] | None = None,
        delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
        **_: Any,  # define kwargs to match the base class signature
    ) -> PipelineContext:
        """Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

        Args:
            context: The pipeline's context, which contains
                metadata and configuration for the action.
            table_identifier: The identifier of the catalog table to
                read. If not provided, the function will attempt to use the table
                identifier from the `table_metadata` in the `context`.
            options: A dictionary of options for customizing
                the [`CatalogReader`][cloe_nessy.integration.reader.catalog_reader]
                behavior, such as filters or reading modes. Defaults to None.
            delta_load_options: Options for delta loading, if applicable.
                Configures the [`DeltaLoader`][cloe_nessy.integration.delta_loader].

        Raises:
            ValueError: If neither `table_identifier` nor `table_metadata.identifier` in the `context` is provided.

        Returns:
        An updated pipeline context containing the data read from the catalog table as a DataFrame.
        """
        if not options:
            options = {}

        if not delta_load_options:
            delta_load_options = {}

        if (table_metadata := context.table_metadata) and table_identifier is None:
            table_identifier = table_metadata.identifier
        if table_identifier is None:
            raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

        if isinstance(delta_load_options, dict):
            delta_options_dict = delta_load_options
            if delta_load_options:
                delta_load_options = DeltaLoadOptions(**delta_load_options)
            else:
                delta_load_options = None
        else:
            delta_options_dict = delta_load_options.model_dump() if delta_load_options else {}

        runtime_info = set_delta_load_info(
            table_identifier=table_identifier,
            delta_load_options=delta_options_dict,
            runtime_info=context.runtime_info or {},
        )

        table_reader = CatalogReader()
        df = table_reader.read(
            table_identifier=table_identifier,
            options=options,
            delta_load_options=delta_load_options,
        )
        return context.from_existing(data=df, runtime_info=runtime_info)

run(context, *, table_identifier=None, options=None, delta_load_options=None, **_) staticmethod

Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

Parameters:

Name Type Description Default
context PipelineContext

The pipeline's context, which contains metadata and configuration for the action.

required
table_identifier str | None

The identifier of the catalog table to read. If not provided, the function will attempt to use the table identifier from the table_metadata in the context.

None
options dict[str, str] | None

A dictionary of options for customizing the CatalogReader behavior, such as filters or reading modes. Defaults to None.

None
delta_load_options dict[Any, Any] | DeltaLoadOptions | None

Options for delta loading, if applicable. Configures the DeltaLoader.

None

Raises:

Type Description
ValueError

If neither table_identifier nor table_metadata.identifier in the context is provided.

Returns: An updated pipeline context containing the data read from the catalog table as a DataFrame.

Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
@staticmethod
def run(
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    options: dict[str, str] | None = None,
    delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
    **_: Any,  # define kwargs to match the base class signature
) -> PipelineContext:
    """Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

    Args:
        context: The pipeline's context, which contains
            metadata and configuration for the action.
        table_identifier: The identifier of the catalog table to
            read. If not provided, the function will attempt to use the table
            identifier from the `table_metadata` in the `context`.
        options: A dictionary of options for customizing
            the [`CatalogReader`][cloe_nessy.integration.reader.catalog_reader]
            behavior, such as filters or reading modes. Defaults to None.
        delta_load_options: Options for delta loading, if applicable.
            Configures the [`DeltaLoader`][cloe_nessy.integration.delta_loader].

    Raises:
        ValueError: If neither `table_identifier` nor `table_metadata.identifier` in the `context` is provided.

    Returns:
    An updated pipeline context containing the data read from the catalog table as a DataFrame.
    """
    if not options:
        options = {}

    if not delta_load_options:
        delta_load_options = {}

    if (table_metadata := context.table_metadata) and table_identifier is None:
        table_identifier = table_metadata.identifier
    if table_identifier is None:
        raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

    if isinstance(delta_load_options, dict):
        delta_options_dict = delta_load_options
        if delta_load_options:
            delta_load_options = DeltaLoadOptions(**delta_load_options)
        else:
            delta_load_options = None
    else:
        delta_options_dict = delta_load_options.model_dump() if delta_load_options else {}

    runtime_info = set_delta_load_info(
        table_identifier=table_identifier,
        delta_load_options=delta_options_dict,
        runtime_info=context.runtime_info or {},
    )

    table_reader = CatalogReader()
    df = table_reader.read(
        table_identifier=table_identifier,
        options=options,
        delta_load_options=delta_load_options,
    )
    return context.from_existing(data=df, runtime_info=runtime_info)