Skip to content

Index

DeltaAppendWriter

Bases: BaseDeltaWriter

A class for appending DataFrames to Delta tables.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_append_writer.py
class DeltaAppendWriter(BaseDeltaWriter):
    """A class for appending DataFrames to Delta tables."""

    def __init__(self):
        super().__init__()
        self._spark = SessionManager.get_spark_session()
        self._dbutils = SessionManager.get_utils()

    @table_log_decorator(operation="append")
    def write(
        self,
        table_identifier: str,
        table_location: str,
        data_frame: DataFrame,
        ignore_empty_df: bool = False,
        options: dict[str, str] | None = None,
    ):
        """Appends the provided DataFrame to a Delta table.

        Args:
            table_identifier: The identifier of the Delta table in the format 'catalog.schema.table'.
            table_location: The location of the Delta table.
            data_frame: The DataFrame to append to the table.
            ignore_empty_df: If True, the function returns early without
                doing anything if the DataFrame is empty.
            options: Additional keyword arguments that will be passed to the 'write' method of the
                FileDataFrameWriter instance. These can be any parameters accepted by the 'write'
                method, which could include options for configuring the write operation, such as
                'checkpointLocation' for specifying the path where checkpoints will be stored, or
                'path' for specifying the path where the output data will be written.
        """
        if self._empty_dataframe_check(data_frame, ignore_empty_df):
            return
        writer = FileWriter()
        writer.write(
            data_frame=data_frame,
            location=table_location,
            format="DELTA",
            mode="APPEND",
            options=options,
        )
        self._report_delta_table_operation_metrics(
            table_identifier=table_identifier, operation_type=DeltaTableOperationType.WRITE
        )

    @table_log_decorator(operation="stream_append")
    def write_stream(
        self,
        table_identifier: str,
        table_location: str,
        data_frame: DataFrame,
        checkpoint_location: str | None = None,
        trigger_dict: dict | None = None,
        options: dict[str, str] | None = None,
        await_termination: bool = False,
    ):
        """Appends the provided DataFrame to a Delta table.

        Args:
            table_identifier: The identifier of the Delta table in the format 'catalog.schema.table'.
            table_location: The location of the Delta table.
            data_frame: The DataFrame to append to the table.
            checkpoint_location: Location of checkpoint. If None, defaults
                to the location of the table being written, with '_checkpoint_'
                added before name. Default None.
            trigger_dict: A dictionary specifying the trigger configuration for the streaming query.
                Supported keys include:

                - "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing.
                - "once": Processes all available data once and then stops.
                - "continuous": Specifies a time interval (e.g., "1 second") for continuous processing.
                - "availableNow": Processes all available data immediately and then stops.

                If nothing is provided, the default is {"availableNow": True}.
            options: Additional keyword arguments that will be passed to the
                'write' method of the FileDataFrameWriter instance. These can be
                any parameters accepted by the 'write' method, which could
                include options for configuring the write operation.
            await_termination: If True, the function will wait for the streaming
                query to finish before returning. This is useful for ensuring that
                the data has been fully written before proceeding with other
                operations.

        Returns:
            None.
        """
        writer = FileWriter()
        writer.write_stream(
            data_frame=data_frame,
            location=table_location,
            format="DELTA",
            checkpoint_location=checkpoint_location,
            mode="APPEND",
            trigger_dict=trigger_dict,
            options=options,
        )
        self._report_delta_table_operation_metrics(
            table_identifier=table_identifier, operation_type=DeltaTableOperationType.WRITE
        )

write(table_identifier, table_location, data_frame, ignore_empty_df=False, options=None)

Appends the provided DataFrame to a Delta table.

Parameters:

Name Type Description Default
table_identifier str

The identifier of the Delta table in the format 'catalog.schema.table'.

required
table_location str

The location of the Delta table.

required
data_frame DataFrame

The DataFrame to append to the table.

required
ignore_empty_df bool

If True, the function returns early without doing anything if the DataFrame is empty.

False
options dict[str, str] | None

Additional keyword arguments that will be passed to the 'write' method of the FileDataFrameWriter instance. These can be any parameters accepted by the 'write' method, which could include options for configuring the write operation, such as 'checkpointLocation' for specifying the path where checkpoints will be stored, or 'path' for specifying the path where the output data will be written.

None
Source code in src/cloe_nessy/integration/writer/delta_writer/delta_append_writer.py
@table_log_decorator(operation="append")
def write(
    self,
    table_identifier: str,
    table_location: str,
    data_frame: DataFrame,
    ignore_empty_df: bool = False,
    options: dict[str, str] | None = None,
):
    """Appends the provided DataFrame to a Delta table.

    Args:
        table_identifier: The identifier of the Delta table in the format 'catalog.schema.table'.
        table_location: The location of the Delta table.
        data_frame: The DataFrame to append to the table.
        ignore_empty_df: If True, the function returns early without
            doing anything if the DataFrame is empty.
        options: Additional keyword arguments that will be passed to the 'write' method of the
            FileDataFrameWriter instance. These can be any parameters accepted by the 'write'
            method, which could include options for configuring the write operation, such as
            'checkpointLocation' for specifying the path where checkpoints will be stored, or
            'path' for specifying the path where the output data will be written.
    """
    if self._empty_dataframe_check(data_frame, ignore_empty_df):
        return
    writer = FileWriter()
    writer.write(
        data_frame=data_frame,
        location=table_location,
        format="DELTA",
        mode="APPEND",
        options=options,
    )
    self._report_delta_table_operation_metrics(
        table_identifier=table_identifier, operation_type=DeltaTableOperationType.WRITE
    )

write_stream(table_identifier, table_location, data_frame, checkpoint_location=None, trigger_dict=None, options=None, await_termination=False)

Appends the provided DataFrame to a Delta table.

Parameters:

Name Type Description Default
table_identifier str

The identifier of the Delta table in the format 'catalog.schema.table'.

required
table_location str

The location of the Delta table.

required
data_frame DataFrame

The DataFrame to append to the table.

required
checkpoint_location str | None

Location of checkpoint. If None, defaults to the location of the table being written, with 'checkpoint' added before name. Default None.

None
trigger_dict dict | None

A dictionary specifying the trigger configuration for the streaming query. Supported keys include:

  • "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing.
  • "once": Processes all available data once and then stops.
  • "continuous": Specifies a time interval (e.g., "1 second") for continuous processing.
  • "availableNow": Processes all available data immediately and then stops.

If nothing is provided, the default is {"availableNow": True}.

None
options dict[str, str] | None

Additional keyword arguments that will be passed to the 'write' method of the FileDataFrameWriter instance. These can be any parameters accepted by the 'write' method, which could include options for configuring the write operation.

None
await_termination bool

If True, the function will wait for the streaming query to finish before returning. This is useful for ensuring that the data has been fully written before proceeding with other operations.

False

Returns:

Type Description

None.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_append_writer.py
@table_log_decorator(operation="stream_append")
def write_stream(
    self,
    table_identifier: str,
    table_location: str,
    data_frame: DataFrame,
    checkpoint_location: str | None = None,
    trigger_dict: dict | None = None,
    options: dict[str, str] | None = None,
    await_termination: bool = False,
):
    """Appends the provided DataFrame to a Delta table.

    Args:
        table_identifier: The identifier of the Delta table in the format 'catalog.schema.table'.
        table_location: The location of the Delta table.
        data_frame: The DataFrame to append to the table.
        checkpoint_location: Location of checkpoint. If None, defaults
            to the location of the table being written, with '_checkpoint_'
            added before name. Default None.
        trigger_dict: A dictionary specifying the trigger configuration for the streaming query.
            Supported keys include:

            - "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing.
            - "once": Processes all available data once and then stops.
            - "continuous": Specifies a time interval (e.g., "1 second") for continuous processing.
            - "availableNow": Processes all available data immediately and then stops.

            If nothing is provided, the default is {"availableNow": True}.
        options: Additional keyword arguments that will be passed to the
            'write' method of the FileDataFrameWriter instance. These can be
            any parameters accepted by the 'write' method, which could
            include options for configuring the write operation.
        await_termination: If True, the function will wait for the streaming
            query to finish before returning. This is useful for ensuring that
            the data has been fully written before proceeding with other
            operations.

    Returns:
        None.
    """
    writer = FileWriter()
    writer.write_stream(
        data_frame=data_frame,
        location=table_location,
        format="DELTA",
        checkpoint_location=checkpoint_location,
        mode="APPEND",
        trigger_dict=trigger_dict,
        options=options,
    )
    self._report_delta_table_operation_metrics(
        table_identifier=table_identifier, operation_type=DeltaTableOperationType.WRITE
    )

DeltaMergeWriter

Bases: BaseDeltaWriter

A class for merging DataFrames to Delta tables.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
class DeltaMergeWriter(BaseDeltaWriter):
    """A class for merging DataFrames to Delta tables."""

    def __init__(self):
        super().__init__()
        self._spark = SessionManager.get_spark_session()
        self._dbutils = SessionManager.get_utils()

    def _validate_table_inputs(
        self, table: Table | None, table_identifier: str | None, storage_path: str | None
    ) -> tuple[str, str]:
        """Validates and retrieves table identifier and storage path."""
        if table is None and (table_identifier is None or storage_path is None):
            raise ValueError("Either a Table object or table_identifier and storage_path must be provided.")
        if table is not None:
            table_identifier = table.identifier
            storage_path = str(table.storage_path)
        if not storage_path:
            raise ValueError("Storage path must be provided or extracted from the Table object.")
        assert table_identifier is not None, "Table identifier must be provided."
        return table_identifier, storage_path

    def _build_match_conditions(self, data_frame: DataFrame, config: DeltaMergeConfig) -> str:
        """Builds match conditions for the Delta table merge."""
        match_conditions = self._merge_match_conditions(config.key_columns)
        if config.use_partition_pruning:
            match_conditions_list = [match_conditions] + [
                self._partition_pruning_conditions(data_frame, config.partition_by),
            ]
            match_conditions = " AND ".join(match_conditions_list)
        return match_conditions

    def _build_merge_operations(
        self, delta_table, data_frame: DataFrame, config: DeltaMergeConfig, match_conditions: str
    ):
        """Builds the Delta table merge operations."""
        delta_table_merge = delta_table.alias("target").merge(
            source=data_frame.alias("source"),
            condition=match_conditions,
        )
        if config.when_matched_update:
            delta_table_merge = delta_table_merge.whenMatchedUpdate(set=config.final_cols_to_update)
        elif config.when_matched_delete:
            delta_table_merge = delta_table_merge.whenMatchedDelete()
        if config.when_not_matched_insert:
            delta_table_merge = delta_table_merge.whenNotMatchedInsert(values=config.final_cols_to_insert)
        return delta_table_merge

    @table_log_decorator(operation="merge")
    def write(
        self,
        data_frame: DataFrame,
        table: Table | None = None,
        table_identifier: str | None = None,
        storage_path: str | None = None,
        ignore_empty_df: bool = False,
        **kwargs: Any,
    ):
        """Merges the data in a spark DataFrame into a Delta table.

        This function performs a merge operation between a DataFrame and a Delta
        table. The function supports update, delete, and insert operations on
        the target Delta table based on conditions specified by the user. The
        function also supports partition pruning to optimize the performance of
        the merge operation.

        Args:
            table: The Table object representing the Delta table.
            table_identifier: The identifier of the Delta table in the format
                'catalog.schema.table'.
            storage_path: The location of the Delta table.
            data_frame: The DataFrame to be merged into the Delta table.
            ignore_empty_df: A flag indicating whether to ignore an empty source
                dataframe.
            kwargs: Passed to the
                [`DeltaMergeConfig`][cloe_nessy.integration.writer.delta_merge_writer.DeltaMergeConfig].

        Raises:
            ValueError: If both, table and table_identifier or storage_path are provided.
            EmptyDataframeException: If the source dataframe is empty and
                ignore_empty_df is False.
            ValueError: If the specified columns for update or insert do not
                exist in the DataFrame or are explicitly excluded from the
                merge operation.
            ValueError: If partition columns are not specified when using
                partition pruning.
        """
        if self._empty_dataframe_check(data_frame, ignore_empty_df):
            return
        table_identifier, storage_path = self._validate_table_inputs(table, table_identifier, storage_path)

        config = DeltaMergeConfig(dataframe_columns=data_frame.columns, **kwargs)

        delta_table = self.table_manager.get_delta_table(location=storage_path, spark=data_frame.sparkSession)

        match_conditions = self._build_match_conditions(data_frame, config)

        delta_table_merge = self._build_merge_operations(delta_table, data_frame, config, match_conditions)
        delta_table_merge.execute()
        self._report_delta_table_operation_metrics(
            table_identifier,
            operation_type=DeltaTableOperationType.MERGE,
        )

    @table_log_decorator(operation="stream_merge")
    def write_stream(self):
        """Not implemented yet. See docs for more details."""
        raise NotImplementedError(
            "Streaming merge is not implemented yet. Please use the `write` method for batch merges."
        )

_build_match_conditions(data_frame, config)

Builds match conditions for the Delta table merge.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
def _build_match_conditions(self, data_frame: DataFrame, config: DeltaMergeConfig) -> str:
    """Builds match conditions for the Delta table merge."""
    match_conditions = self._merge_match_conditions(config.key_columns)
    if config.use_partition_pruning:
        match_conditions_list = [match_conditions] + [
            self._partition_pruning_conditions(data_frame, config.partition_by),
        ]
        match_conditions = " AND ".join(match_conditions_list)
    return match_conditions

_build_merge_operations(delta_table, data_frame, config, match_conditions)

Builds the Delta table merge operations.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
def _build_merge_operations(
    self, delta_table, data_frame: DataFrame, config: DeltaMergeConfig, match_conditions: str
):
    """Builds the Delta table merge operations."""
    delta_table_merge = delta_table.alias("target").merge(
        source=data_frame.alias("source"),
        condition=match_conditions,
    )
    if config.when_matched_update:
        delta_table_merge = delta_table_merge.whenMatchedUpdate(set=config.final_cols_to_update)
    elif config.when_matched_delete:
        delta_table_merge = delta_table_merge.whenMatchedDelete()
    if config.when_not_matched_insert:
        delta_table_merge = delta_table_merge.whenNotMatchedInsert(values=config.final_cols_to_insert)
    return delta_table_merge

_validate_table_inputs(table, table_identifier, storage_path)

Validates and retrieves table identifier and storage path.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
def _validate_table_inputs(
    self, table: Table | None, table_identifier: str | None, storage_path: str | None
) -> tuple[str, str]:
    """Validates and retrieves table identifier and storage path."""
    if table is None and (table_identifier is None or storage_path is None):
        raise ValueError("Either a Table object or table_identifier and storage_path must be provided.")
    if table is not None:
        table_identifier = table.identifier
        storage_path = str(table.storage_path)
    if not storage_path:
        raise ValueError("Storage path must be provided or extracted from the Table object.")
    assert table_identifier is not None, "Table identifier must be provided."
    return table_identifier, storage_path

write(data_frame, table=None, table_identifier=None, storage_path=None, ignore_empty_df=False, **kwargs)

Merges the data in a spark DataFrame into a Delta table.

This function performs a merge operation between a DataFrame and a Delta table. The function supports update, delete, and insert operations on the target Delta table based on conditions specified by the user. The function also supports partition pruning to optimize the performance of the merge operation.

Parameters:

Name Type Description Default
table Table | None

The Table object representing the Delta table.

None
table_identifier str | None

The identifier of the Delta table in the format 'catalog.schema.table'.

None
storage_path str | None

The location of the Delta table.

None
data_frame DataFrame

The DataFrame to be merged into the Delta table.

required
ignore_empty_df bool

A flag indicating whether to ignore an empty source dataframe.

False
kwargs Any

Passed to the [DeltaMergeConfig][cloe_nessy.integration.writer.delta_merge_writer.DeltaMergeConfig].

{}

Raises:

Type Description
ValueError

If both, table and table_identifier or storage_path are provided.

EmptyDataframeException

If the source dataframe is empty and ignore_empty_df is False.

ValueError

If the specified columns for update or insert do not exist in the DataFrame or are explicitly excluded from the merge operation.

ValueError

If partition columns are not specified when using partition pruning.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
@table_log_decorator(operation="merge")
def write(
    self,
    data_frame: DataFrame,
    table: Table | None = None,
    table_identifier: str | None = None,
    storage_path: str | None = None,
    ignore_empty_df: bool = False,
    **kwargs: Any,
):
    """Merges the data in a spark DataFrame into a Delta table.

    This function performs a merge operation between a DataFrame and a Delta
    table. The function supports update, delete, and insert operations on
    the target Delta table based on conditions specified by the user. The
    function also supports partition pruning to optimize the performance of
    the merge operation.

    Args:
        table: The Table object representing the Delta table.
        table_identifier: The identifier of the Delta table in the format
            'catalog.schema.table'.
        storage_path: The location of the Delta table.
        data_frame: The DataFrame to be merged into the Delta table.
        ignore_empty_df: A flag indicating whether to ignore an empty source
            dataframe.
        kwargs: Passed to the
            [`DeltaMergeConfig`][cloe_nessy.integration.writer.delta_merge_writer.DeltaMergeConfig].

    Raises:
        ValueError: If both, table and table_identifier or storage_path are provided.
        EmptyDataframeException: If the source dataframe is empty and
            ignore_empty_df is False.
        ValueError: If the specified columns for update or insert do not
            exist in the DataFrame or are explicitly excluded from the
            merge operation.
        ValueError: If partition columns are not specified when using
            partition pruning.
    """
    if self._empty_dataframe_check(data_frame, ignore_empty_df):
        return
    table_identifier, storage_path = self._validate_table_inputs(table, table_identifier, storage_path)

    config = DeltaMergeConfig(dataframe_columns=data_frame.columns, **kwargs)

    delta_table = self.table_manager.get_delta_table(location=storage_path, spark=data_frame.sparkSession)

    match_conditions = self._build_match_conditions(data_frame, config)

    delta_table_merge = self._build_merge_operations(delta_table, data_frame, config, match_conditions)
    delta_table_merge.execute()
    self._report_delta_table_operation_metrics(
        table_identifier,
        operation_type=DeltaTableOperationType.MERGE,
    )

write_stream()

Not implemented yet. See docs for more details.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
@table_log_decorator(operation="stream_merge")
def write_stream(self):
    """Not implemented yet. See docs for more details."""
    raise NotImplementedError(
        "Streaming merge is not implemented yet. Please use the `write` method for batch merges."
    )