Skip to content

write_catalog_table

WriteCatalogTableAction

Bases: PipelineAction

Writes a DataFrame to a specified catalog table using CatalogWriter.

Example
Write Table to Catalog:
    action: WRITE_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        mode: append
        partition_by: day
        options:
            mergeSchema: true
Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
class WriteCatalogTableAction(PipelineAction):
    """Writes a DataFrame to a specified catalog table using [CatalogWriter][cloe_nessy.integration.writer.CatalogWriter].

    Example:
        ```yaml
        Write Table to Catalog:
            action: WRITE_CATALOG_TABLE
            options:
                table_identifier: my_catalog.business_schema.sales_table
                mode: append
                partition_by: day
                options:
                    mergeSchema: true
        ```
    """

    name: str = "WRITE_CATALOG_TABLE"

    @staticmethod
    def run(
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        mode: str = "append",
        partition_by: str | list[str] | None = None,
        options: dict[str, str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Writes a DataFrame to a specified catalog table.

        Args:
            context: Context in which this Action is executed.
            table_identifier: The table identifier in the unity catalog in the
                format 'catalog.schema.table'. If not provided, attempts to use the
                context's table metadata.
            mode: The write mode. One of 'append', 'overwrite', 'error',
                'errorifexists', or 'ignore'.
            partition_by: Names of the partitioning columns.
            options: PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true).

        Raises:
            ValueError: If the table name is not specified or cannot be inferred from
                the context.

        Returns:
            Context after the execution of this Action.
        """
        if not options:
            options = dict()
        if partition_by is None:
            if hasattr(context.table_metadata, "partition_by"):
                partition_by = context.table_metadata.partition_by  # type: ignore

        if (table_metadata := context.table_metadata) and table_identifier is None:
            table_identifier = table_metadata.identifier
        if table_identifier is None:
            raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        writer = CatalogWriter()
        writer.write_table(
            df=context.data,  # type: ignore
            table_identifier=table_identifier,
            mode=mode,
            partition_by=partition_by,
            options=options,
        )
        return context.from_existing()

run(context, *, table_identifier=None, mode='append', partition_by=None, options=None, **_) staticmethod

Writes a DataFrame to a specified catalog table.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
table_identifier str | None

The table identifier in the unity catalog in the format 'catalog.schema.table'. If not provided, attempts to use the context's table metadata.

None
mode str

The write mode. One of 'append', 'overwrite', 'error', 'errorifexists', or 'ignore'.

'append'
partition_by str | list[str] | None

Names of the partitioning columns.

None
options dict[str, str] | None

PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true).

None

Raises:

Type Description
ValueError

If the table name is not specified or cannot be inferred from the context.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
@staticmethod
def run(
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    mode: str = "append",
    partition_by: str | list[str] | None = None,
    options: dict[str, str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Writes a DataFrame to a specified catalog table.

    Args:
        context: Context in which this Action is executed.
        table_identifier: The table identifier in the unity catalog in the
            format 'catalog.schema.table'. If not provided, attempts to use the
            context's table metadata.
        mode: The write mode. One of 'append', 'overwrite', 'error',
            'errorifexists', or 'ignore'.
        partition_by: Names of the partitioning columns.
        options: PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true).

    Raises:
        ValueError: If the table name is not specified or cannot be inferred from
            the context.

    Returns:
        Context after the execution of this Action.
    """
    if not options:
        options = dict()
    if partition_by is None:
        if hasattr(context.table_metadata, "partition_by"):
            partition_by = context.table_metadata.partition_by  # type: ignore

    if (table_metadata := context.table_metadata) and table_identifier is None:
        table_identifier = table_metadata.identifier
    if table_identifier is None:
        raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    writer = CatalogWriter()
    writer.write_table(
        df=context.data,  # type: ignore
        table_identifier=table_identifier,
        mode=mode,
        partition_by=partition_by,
        options=options,
    )
    return context.from_existing()