Skip to content

write_delta_append

WriteDeltaAppendAction

Bases: PipelineAction

This class implements an Append action for an ETL pipeline.

The WriteDeltaAppendAction appends a Dataframe to Delta Table.

Example
Write Delta Append:
    action: WRITE_DELTA_APPEND
    options:
        table_identifier: my_catalog.my_schema.my_table
        ignore_empty_df: false

Returns:

Type Description

None.

Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
class WriteDeltaAppendAction(PipelineAction):
    """This class implements an Append action for an ETL pipeline.

    The WriteDeltaAppendAction appends a Dataframe to Delta Table.

    Example:
        ```yaml
        Write Delta Append:
            action: WRITE_DELTA_APPEND
            options:
                table_identifier: my_catalog.my_schema.my_table
                ignore_empty_df: false
        ```

    Returns:
        None.
    """

    name: str = "WRITE_DELTA_APPEND"

    def run(
        self,
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        ignore_empty_df: bool = False,
        options: dict[str, Any] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Merge the dataframe into the delta table.

        Args:
            context: Context in which this Action is executed.
            table_identifier: The identifier of the table. If passed, the
                UC Adapter will be used to create a table object. Otherwise the Table
                object will be created from the table metadata in the context.
            ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
            options: Additional options for the append writer.

        Raises:
            ValueError: If the table does not exist.
            ValueError: If the data is not set in the pipeline context.
            ValueError: If the table metadata is empty.

        Returns:
            Pipeline Context
        """
        delta_append_writer = DeltaAppendWriter()

        if context.data is None:
            raise ValueError("Data is required for the append operation.")
        if context.table_metadata is None and table_identifier is None:
            raise ValueError("Table metadata or a table identifier are required for the append operation.")

        if table_identifier is not None:
            context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
        else:
            if context.table_metadata is None:
                raise ValueError("Table metadata is required.")

        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

        delta_append_writer.write(
            table_identifier=context.table_metadata.identifier,
            table_location=context.table_metadata.storage_path,
            data_frame=context.data,
            ignore_empty_df=ignore_empty_df,
            options=options,
        )

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        return context.from_existing()

run(context, *, table_identifier=None, ignore_empty_df=False, options=None, **_)

Merge the dataframe into the delta table.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
table_identifier str | None

The identifier of the table. If passed, the UC Adapter will be used to create a table object. Otherwise the Table object will be created from the table metadata in the context.

None
ignore_empty_df bool

A flag indicating whether to ignore an empty source dataframe.

False
options dict[str, Any] | None

Additional options for the append writer.

None

Raises:

Type Description
ValueError

If the table does not exist.

ValueError

If the data is not set in the pipeline context.

ValueError

If the table metadata is empty.

Returns:

Type Description
PipelineContext

Pipeline Context

Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
def run(
    self,
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    ignore_empty_df: bool = False,
    options: dict[str, Any] | None = None,
    **_: Any,
) -> PipelineContext:
    """Merge the dataframe into the delta table.

    Args:
        context: Context in which this Action is executed.
        table_identifier: The identifier of the table. If passed, the
            UC Adapter will be used to create a table object. Otherwise the Table
            object will be created from the table metadata in the context.
        ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
        options: Additional options for the append writer.

    Raises:
        ValueError: If the table does not exist.
        ValueError: If the data is not set in the pipeline context.
        ValueError: If the table metadata is empty.

    Returns:
        Pipeline Context
    """
    delta_append_writer = DeltaAppendWriter()

    if context.data is None:
        raise ValueError("Data is required for the append operation.")
    if context.table_metadata is None and table_identifier is None:
        raise ValueError("Table metadata or a table identifier are required for the append operation.")

    if table_identifier is not None:
        context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
    else:
        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

    if context.table_metadata is None:
        raise ValueError("Table metadata is required.")

    delta_append_writer.write(
        table_identifier=context.table_metadata.identifier,
        table_location=context.table_metadata.storage_path,
        data_frame=context.data,
        ignore_empty_df=ignore_empty_df,
        options=options,
    )

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    return context.from_existing()