Skip to content

Index

CatalogWriter

A writer for Catalog tables.

Source code in src/cloe_nessy/integration/writer/catalog_writer.py
class CatalogWriter:
    """A writer for Catalog tables."""

    @staticmethod
    def write_table(
        df: DataFrame | None,
        table_identifier: str | None,
        partition_by: str | list[str] | None = None,
        options: dict[str, str] | None = None,
        mode: str = "append",
    ) -> None:
        """Write a table to the unity catalog.

        Args:
            df: The DataFrame to write.
            table_identifier: The table identifier in the unity catalog in the
                              format 'catalog.schema.table'.
            mode: The write mode. One of append, overwrite, error, errorifexists, ignore.
            partition_by: Names of the partitioning columns.
            options: PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true).

        Notes:
            append: Append contents of this DataFrame to existing data.
            overwrite: Overwrite existing data.
            error or errorifexists: Throw an exception if data already exists.
            ignore: Silently ignore this operation if data already exists.

        Raises:
            ValueError: If the mode is not one of append, overwrite, error, errorifexists, ignore.
            ValueError: If the table_identifier is not a string or not in the format 'catalog.schema.table'.
            ValueError: If the DataFrame is None.
        """
        if mode not in ("append", "overwrite", "error", "errorifexists", "ignore"):
            raise ValueError("mode must be one of append, overwrite, error, errorifexists, ignore")
        if not table_identifier:
            raise ValueError("table_identifier is required")
        elif not isinstance(table_identifier, str):
            raise ValueError("table_identifier must be a string")
        elif len(table_identifier.split(".")) != 3:
            raise ValueError("table_identifier must be in the format 'catalog.schema.table'")
        if not df:
            raise ValueError("df is required, but was None.")
        if options is None:
            options = {}
        df.write.saveAsTable(table_identifier, mode=mode, partitionBy=partition_by, **options)

write_table(df, table_identifier, partition_by=None, options=None, mode='append') staticmethod

Write a table to the unity catalog.

Parameters:

Name Type Description Default
df DataFrame | None

The DataFrame to write.

required
table_identifier str | None

The table identifier in the unity catalog in the format 'catalog.schema.table'.

required
mode str

The write mode. One of append, overwrite, error, errorifexists, ignore.

'append'
partition_by str | list[str] | None

Names of the partitioning columns.

None
options dict[str, str] | None

PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true).

None
Notes

append: Append contents of this DataFrame to existing data. overwrite: Overwrite existing data. error or errorifexists: Throw an exception if data already exists. ignore: Silently ignore this operation if data already exists.

Raises:

Type Description
ValueError

If the mode is not one of append, overwrite, error, errorifexists, ignore.

ValueError

If the table_identifier is not a string or not in the format 'catalog.schema.table'.

ValueError

If the DataFrame is None.

Source code in src/cloe_nessy/integration/writer/catalog_writer.py
@staticmethod
def write_table(
    df: DataFrame | None,
    table_identifier: str | None,
    partition_by: str | list[str] | None = None,
    options: dict[str, str] | None = None,
    mode: str = "append",
) -> None:
    """Write a table to the unity catalog.

    Args:
        df: The DataFrame to write.
        table_identifier: The table identifier in the unity catalog in the
                          format 'catalog.schema.table'.
        mode: The write mode. One of append, overwrite, error, errorifexists, ignore.
        partition_by: Names of the partitioning columns.
        options: PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true).

    Notes:
        append: Append contents of this DataFrame to existing data.
        overwrite: Overwrite existing data.
        error or errorifexists: Throw an exception if data already exists.
        ignore: Silently ignore this operation if data already exists.

    Raises:
        ValueError: If the mode is not one of append, overwrite, error, errorifexists, ignore.
        ValueError: If the table_identifier is not a string or not in the format 'catalog.schema.table'.
        ValueError: If the DataFrame is None.
    """
    if mode not in ("append", "overwrite", "error", "errorifexists", "ignore"):
        raise ValueError("mode must be one of append, overwrite, error, errorifexists, ignore")
    if not table_identifier:
        raise ValueError("table_identifier is required")
    elif not isinstance(table_identifier, str):
        raise ValueError("table_identifier must be a string")
    elif len(table_identifier.split(".")) != 3:
        raise ValueError("table_identifier must be in the format 'catalog.schema.table'")
    if not df:
        raise ValueError("df is required, but was None.")
    if options is None:
        options = {}
    df.write.saveAsTable(table_identifier, mode=mode, partitionBy=partition_by, **options)

DeltaAppendWriter

Bases: BaseDeltaWriter

A class for appending DataFrames to Delta tables.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_append_writer.py
class DeltaAppendWriter(BaseDeltaWriter):
    """A class for appending DataFrames to Delta tables."""

    def __init__(self):
        super().__init__()
        self._spark = SessionManager.get_spark_session()
        self._dbutils = SessionManager.get_utils()

    @table_log_decorator(operation="append")
    def write(
        self,
        table_identifier: str,
        table_location: str,
        data_frame: DataFrame,
        ignore_empty_df: bool = False,
        options: dict[str, str] | None = None,
    ):
        """Appends the provided DataFrame to a Delta table.

        Args:
            table_identifier: The identifier of the Delta table in the format 'catalog.schema.table'.
            table_location: The location of the Delta table.
            data_frame: The DataFrame to append to the table.
            ignore_empty_df: If True, the function returns early without
                doing anything if the DataFrame is empty.
            options: Additional keyword arguments that will be passed to the 'write' method of the
                FileDataFrameWriter instance. These can be any parameters accepted by the 'write'
                method, which could include options for configuring the write operation, such as
                'checkpointLocation' for specifying the path where checkpoints will be stored, or
                'path' for specifying the path where the output data will be written.
        """
        if self._empty_dataframe_check(data_frame, ignore_empty_df):
            return
        writer = FileWriter()
        writer.write(
            data_frame=data_frame,
            location=table_location,
            format="DELTA",
            mode="APPEND",
            options=options,
        )
        self._report_delta_table_operation_metrics(
            table_identifier=table_identifier, operation_type=DeltaTableOperationType.WRITE
        )

    @table_log_decorator(operation="stream_append")
    def write_stream(
        self,
        table_identifier: str,
        table_location: str,
        data_frame: DataFrame,
        checkpoint_location: str | None = None,
        trigger_dict: dict | None = None,
        options: dict[str, str] | None = None,
        await_termination: bool = False,
    ):
        """Appends the provided DataFrame to a Delta table.

        Args:
            table_identifier: The identifier of the Delta table in the format 'catalog.schema.table'.
            table_location: The location of the Delta table.
            data_frame: The DataFrame to append to the table.
            checkpoint_location: Location of checkpoint. If None, defaults
                to the location of the table being written, with '_checkpoint_'
                added before name. Default None.
            trigger_dict: A dictionary specifying the trigger configuration for the streaming query.
                Supported keys include:

                - "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing.
                - "once": Processes all available data once and then stops.
                - "continuous": Specifies a time interval (e.g., "1 second") for continuous processing.
                - "availableNow": Processes all available data immediately and then stops.

                If nothing is provided, the default is {"availableNow": True}.
            options: Additional keyword arguments that will be passed to the
                'write' method of the FileDataFrameWriter instance. These can be
                any parameters accepted by the 'write' method, which could
                include options for configuring the write operation.
            await_termination: If True, the function will wait for the streaming
                query to finish before returning. This is useful for ensuring that
                the data has been fully written before proceeding with other
                operations.

        Returns:
            None.
        """
        writer = FileWriter()
        writer.write_stream(
            data_frame=data_frame,
            location=table_location,
            format="DELTA",
            checkpoint_location=checkpoint_location,
            mode="APPEND",
            trigger_dict=trigger_dict,
            options=options,
        )
        self._report_delta_table_operation_metrics(
            table_identifier=table_identifier, operation_type=DeltaTableOperationType.WRITE
        )

write(table_identifier, table_location, data_frame, ignore_empty_df=False, options=None)

Appends the provided DataFrame to a Delta table.

Parameters:

Name Type Description Default
table_identifier str

The identifier of the Delta table in the format 'catalog.schema.table'.

required
table_location str

The location of the Delta table.

required
data_frame DataFrame

The DataFrame to append to the table.

required
ignore_empty_df bool

If True, the function returns early without doing anything if the DataFrame is empty.

False
options dict[str, str] | None

Additional keyword arguments that will be passed to the 'write' method of the FileDataFrameWriter instance. These can be any parameters accepted by the 'write' method, which could include options for configuring the write operation, such as 'checkpointLocation' for specifying the path where checkpoints will be stored, or 'path' for specifying the path where the output data will be written.

None
Source code in src/cloe_nessy/integration/writer/delta_writer/delta_append_writer.py
@table_log_decorator(operation="append")
def write(
    self,
    table_identifier: str,
    table_location: str,
    data_frame: DataFrame,
    ignore_empty_df: bool = False,
    options: dict[str, str] | None = None,
):
    """Appends the provided DataFrame to a Delta table.

    Args:
        table_identifier: The identifier of the Delta table in the format 'catalog.schema.table'.
        table_location: The location of the Delta table.
        data_frame: The DataFrame to append to the table.
        ignore_empty_df: If True, the function returns early without
            doing anything if the DataFrame is empty.
        options: Additional keyword arguments that will be passed to the 'write' method of the
            FileDataFrameWriter instance. These can be any parameters accepted by the 'write'
            method, which could include options for configuring the write operation, such as
            'checkpointLocation' for specifying the path where checkpoints will be stored, or
            'path' for specifying the path where the output data will be written.
    """
    if self._empty_dataframe_check(data_frame, ignore_empty_df):
        return
    writer = FileWriter()
    writer.write(
        data_frame=data_frame,
        location=table_location,
        format="DELTA",
        mode="APPEND",
        options=options,
    )
    self._report_delta_table_operation_metrics(
        table_identifier=table_identifier, operation_type=DeltaTableOperationType.WRITE
    )

write_stream(table_identifier, table_location, data_frame, checkpoint_location=None, trigger_dict=None, options=None, await_termination=False)

Appends the provided DataFrame to a Delta table.

Parameters:

Name Type Description Default
table_identifier str

The identifier of the Delta table in the format 'catalog.schema.table'.

required
table_location str

The location of the Delta table.

required
data_frame DataFrame

The DataFrame to append to the table.

required
checkpoint_location str | None

Location of checkpoint. If None, defaults to the location of the table being written, with 'checkpoint' added before name. Default None.

None
trigger_dict dict | None

A dictionary specifying the trigger configuration for the streaming query. Supported keys include:

  • "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing.
  • "once": Processes all available data once and then stops.
  • "continuous": Specifies a time interval (e.g., "1 second") for continuous processing.
  • "availableNow": Processes all available data immediately and then stops.

If nothing is provided, the default is {"availableNow": True}.

None
options dict[str, str] | None

Additional keyword arguments that will be passed to the 'write' method of the FileDataFrameWriter instance. These can be any parameters accepted by the 'write' method, which could include options for configuring the write operation.

None
await_termination bool

If True, the function will wait for the streaming query to finish before returning. This is useful for ensuring that the data has been fully written before proceeding with other operations.

False

Returns:

Type Description

None.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_append_writer.py
@table_log_decorator(operation="stream_append")
def write_stream(
    self,
    table_identifier: str,
    table_location: str,
    data_frame: DataFrame,
    checkpoint_location: str | None = None,
    trigger_dict: dict | None = None,
    options: dict[str, str] | None = None,
    await_termination: bool = False,
):
    """Appends the provided DataFrame to a Delta table.

    Args:
        table_identifier: The identifier of the Delta table in the format 'catalog.schema.table'.
        table_location: The location of the Delta table.
        data_frame: The DataFrame to append to the table.
        checkpoint_location: Location of checkpoint. If None, defaults
            to the location of the table being written, with '_checkpoint_'
            added before name. Default None.
        trigger_dict: A dictionary specifying the trigger configuration for the streaming query.
            Supported keys include:

            - "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing.
            - "once": Processes all available data once and then stops.
            - "continuous": Specifies a time interval (e.g., "1 second") for continuous processing.
            - "availableNow": Processes all available data immediately and then stops.

            If nothing is provided, the default is {"availableNow": True}.
        options: Additional keyword arguments that will be passed to the
            'write' method of the FileDataFrameWriter instance. These can be
            any parameters accepted by the 'write' method, which could
            include options for configuring the write operation.
        await_termination: If True, the function will wait for the streaming
            query to finish before returning. This is useful for ensuring that
            the data has been fully written before proceeding with other
            operations.

    Returns:
        None.
    """
    writer = FileWriter()
    writer.write_stream(
        data_frame=data_frame,
        location=table_location,
        format="DELTA",
        checkpoint_location=checkpoint_location,
        mode="APPEND",
        trigger_dict=trigger_dict,
        options=options,
    )
    self._report_delta_table_operation_metrics(
        table_identifier=table_identifier, operation_type=DeltaTableOperationType.WRITE
    )

DeltaMergeWriter

Bases: BaseDeltaWriter

A class for merging DataFrames to Delta tables.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
class DeltaMergeWriter(BaseDeltaWriter):
    """A class for merging DataFrames to Delta tables."""

    def __init__(self):
        super().__init__()
        self._spark = SessionManager.get_spark_session()
        self._dbutils = SessionManager.get_utils()

    def _validate_table_inputs(
        self, table: Table | None, table_identifier: str | None, storage_path: str | None
    ) -> tuple[str, str]:
        """Validates and retrieves table identifier and storage path."""
        if table is None and (table_identifier is None or storage_path is None):
            raise ValueError("Either a Table object or table_identifier and storage_path must be provided.")
        if table is not None:
            table_identifier = table.identifier
            storage_path = str(table.storage_path)
        if not storage_path:
            raise ValueError("Storage path must be provided or extracted from the Table object.")
        assert table_identifier is not None, "Table identifier must be provided."
        return table_identifier, storage_path

    def _build_match_conditions(self, data_frame: DataFrame, config: DeltaMergeConfig) -> str:
        """Builds match conditions for the Delta table merge."""
        match_conditions = self._merge_match_conditions(config.key_columns)
        if config.use_partition_pruning:
            match_conditions_list = [match_conditions] + [
                self._partition_pruning_conditions(data_frame, config.partition_by),
            ]
            match_conditions = " AND ".join(match_conditions_list)
        return match_conditions

    def _build_merge_operations(
        self, delta_table, data_frame: DataFrame, config: DeltaMergeConfig, match_conditions: str
    ):
        """Builds the Delta table merge operations."""
        delta_table_merge = delta_table.alias("target").merge(
            source=data_frame.alias("source"),
            condition=match_conditions,
        )
        if config.when_matched_update:
            delta_table_merge = delta_table_merge.whenMatchedUpdate(set=config.final_cols_to_update)
        elif config.when_matched_delete:
            delta_table_merge = delta_table_merge.whenMatchedDelete()
        if config.when_not_matched_insert:
            delta_table_merge = delta_table_merge.whenNotMatchedInsert(values=config.final_cols_to_insert)
        return delta_table_merge

    @table_log_decorator(operation="merge")
    def write(
        self,
        data_frame: DataFrame,
        table: Table | None = None,
        table_identifier: str | None = None,
        storage_path: str | None = None,
        ignore_empty_df: bool = False,
        **kwargs: Any,
    ):
        """Merges the data in a spark DataFrame into a Delta table.

        This function performs a merge operation between a DataFrame and a Delta
        table. The function supports update, delete, and insert operations on
        the target Delta table based on conditions specified by the user. The
        function also supports partition pruning to optimize the performance of
        the merge operation.

        Args:
            table: The Table object representing the Delta table.
            table_identifier: The identifier of the Delta table in the format
                'catalog.schema.table'.
            storage_path: The location of the Delta table.
            data_frame: The DataFrame to be merged into the Delta table.
            ignore_empty_df: A flag indicating whether to ignore an empty source
                dataframe.
            kwargs: Passed to the
                [`DeltaMergeConfig`][cloe_nessy.integration.writer.delta_merge_writer.DeltaMergeConfig].

        Raises:
            ValueError: If both, table and table_identifier or storage_path are provided.
            EmptyDataframeException: If the source dataframe is empty and
                ignore_empty_df is False.
            ValueError: If the specified columns for update or insert do not
                exist in the DataFrame or are explicitly excluded from the
                merge operation.
            ValueError: If partition columns are not specified when using
                partition pruning.
        """
        if self._empty_dataframe_check(data_frame, ignore_empty_df):
            return
        table_identifier, storage_path = self._validate_table_inputs(table, table_identifier, storage_path)

        config = DeltaMergeConfig(dataframe_columns=data_frame.columns, **kwargs)

        delta_table = self.table_manager.get_delta_table(location=storage_path, spark=data_frame.sparkSession)

        match_conditions = self._build_match_conditions(data_frame, config)

        delta_table_merge = self._build_merge_operations(delta_table, data_frame, config, match_conditions)
        delta_table_merge.execute()
        self._report_delta_table_operation_metrics(
            table_identifier,
            operation_type=DeltaTableOperationType.MERGE,
        )

    @table_log_decorator(operation="stream_merge")
    def write_stream(self):
        """Not implemented yet. See docs for more details."""
        raise NotImplementedError(
            "Streaming merge is not implemented yet. Please use the `write` method for batch merges."
        )

_build_match_conditions(data_frame, config)

Builds match conditions for the Delta table merge.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
def _build_match_conditions(self, data_frame: DataFrame, config: DeltaMergeConfig) -> str:
    """Builds match conditions for the Delta table merge."""
    match_conditions = self._merge_match_conditions(config.key_columns)
    if config.use_partition_pruning:
        match_conditions_list = [match_conditions] + [
            self._partition_pruning_conditions(data_frame, config.partition_by),
        ]
        match_conditions = " AND ".join(match_conditions_list)
    return match_conditions

_build_merge_operations(delta_table, data_frame, config, match_conditions)

Builds the Delta table merge operations.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
def _build_merge_operations(
    self, delta_table, data_frame: DataFrame, config: DeltaMergeConfig, match_conditions: str
):
    """Builds the Delta table merge operations."""
    delta_table_merge = delta_table.alias("target").merge(
        source=data_frame.alias("source"),
        condition=match_conditions,
    )
    if config.when_matched_update:
        delta_table_merge = delta_table_merge.whenMatchedUpdate(set=config.final_cols_to_update)
    elif config.when_matched_delete:
        delta_table_merge = delta_table_merge.whenMatchedDelete()
    if config.when_not_matched_insert:
        delta_table_merge = delta_table_merge.whenNotMatchedInsert(values=config.final_cols_to_insert)
    return delta_table_merge

_validate_table_inputs(table, table_identifier, storage_path)

Validates and retrieves table identifier and storage path.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
def _validate_table_inputs(
    self, table: Table | None, table_identifier: str | None, storage_path: str | None
) -> tuple[str, str]:
    """Validates and retrieves table identifier and storage path."""
    if table is None and (table_identifier is None or storage_path is None):
        raise ValueError("Either a Table object or table_identifier and storage_path must be provided.")
    if table is not None:
        table_identifier = table.identifier
        storage_path = str(table.storage_path)
    if not storage_path:
        raise ValueError("Storage path must be provided or extracted from the Table object.")
    assert table_identifier is not None, "Table identifier must be provided."
    return table_identifier, storage_path

write(data_frame, table=None, table_identifier=None, storage_path=None, ignore_empty_df=False, **kwargs)

Merges the data in a spark DataFrame into a Delta table.

This function performs a merge operation between a DataFrame and a Delta table. The function supports update, delete, and insert operations on the target Delta table based on conditions specified by the user. The function also supports partition pruning to optimize the performance of the merge operation.

Parameters:

Name Type Description Default
table Table | None

The Table object representing the Delta table.

None
table_identifier str | None

The identifier of the Delta table in the format 'catalog.schema.table'.

None
storage_path str | None

The location of the Delta table.

None
data_frame DataFrame

The DataFrame to be merged into the Delta table.

required
ignore_empty_df bool

A flag indicating whether to ignore an empty source dataframe.

False
kwargs Any

Passed to the [DeltaMergeConfig][cloe_nessy.integration.writer.delta_merge_writer.DeltaMergeConfig].

{}

Raises:

Type Description
ValueError

If both, table and table_identifier or storage_path are provided.

EmptyDataframeException

If the source dataframe is empty and ignore_empty_df is False.

ValueError

If the specified columns for update or insert do not exist in the DataFrame or are explicitly excluded from the merge operation.

ValueError

If partition columns are not specified when using partition pruning.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
@table_log_decorator(operation="merge")
def write(
    self,
    data_frame: DataFrame,
    table: Table | None = None,
    table_identifier: str | None = None,
    storage_path: str | None = None,
    ignore_empty_df: bool = False,
    **kwargs: Any,
):
    """Merges the data in a spark DataFrame into a Delta table.

    This function performs a merge operation between a DataFrame and a Delta
    table. The function supports update, delete, and insert operations on
    the target Delta table based on conditions specified by the user. The
    function also supports partition pruning to optimize the performance of
    the merge operation.

    Args:
        table: The Table object representing the Delta table.
        table_identifier: The identifier of the Delta table in the format
            'catalog.schema.table'.
        storage_path: The location of the Delta table.
        data_frame: The DataFrame to be merged into the Delta table.
        ignore_empty_df: A flag indicating whether to ignore an empty source
            dataframe.
        kwargs: Passed to the
            [`DeltaMergeConfig`][cloe_nessy.integration.writer.delta_merge_writer.DeltaMergeConfig].

    Raises:
        ValueError: If both, table and table_identifier or storage_path are provided.
        EmptyDataframeException: If the source dataframe is empty and
            ignore_empty_df is False.
        ValueError: If the specified columns for update or insert do not
            exist in the DataFrame or are explicitly excluded from the
            merge operation.
        ValueError: If partition columns are not specified when using
            partition pruning.
    """
    if self._empty_dataframe_check(data_frame, ignore_empty_df):
        return
    table_identifier, storage_path = self._validate_table_inputs(table, table_identifier, storage_path)

    config = DeltaMergeConfig(dataframe_columns=data_frame.columns, **kwargs)

    delta_table = self.table_manager.get_delta_table(location=storage_path, spark=data_frame.sparkSession)

    match_conditions = self._build_match_conditions(data_frame, config)

    delta_table_merge = self._build_merge_operations(delta_table, data_frame, config, match_conditions)
    delta_table_merge.execute()
    self._report_delta_table_operation_metrics(
        table_identifier,
        operation_type=DeltaTableOperationType.MERGE,
    )

write_stream()

Not implemented yet. See docs for more details.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
@table_log_decorator(operation="stream_merge")
def write_stream(self):
    """Not implemented yet. See docs for more details."""
    raise NotImplementedError(
        "Streaming merge is not implemented yet. Please use the `write` method for batch merges."
    )

FileWriter

Bases: BaseWriter

Utility class for writing a DataFrame to a file.

Source code in src/cloe_nessy/integration/writer/file_writer.py
class FileWriter(BaseWriter):
    """Utility class for writing a DataFrame to a file."""

    def __init__(self):
        super().__init__()

    def _get_writer(self, df: DataFrame) -> DataFrameWriter:
        """Returns a DataFrameWriter."""
        return df.write

    def _get_stream_writer(self, df: DataFrame) -> DataStreamWriter:
        """Returns a DataStreamWriter."""
        return df.writeStream

    def _log_operation(self, location: str, status: str, error: str | None = None):
        """Logs the status of an operation."""
        if status == "start":
            self._console_logger.info(f"Starting to write to {location}")
        elif status == "succeeded":
            self._console_logger.info(f"Successfully wrote to {location}")
        elif status == "failed":
            self._console_logger.error(f"Failed to write to {location}: {error}")

    def _validate_trigger(self, trigger_dict: dict[str, Any]):
        """Validates the trigger type."""
        triggers = ["processingTime", "once", "continuous", "availableNow"]
        if not any(trigger in trigger_dict for trigger in triggers):
            raise ValueError(f"Invalid trigger type. Supported types are: {', '.join(triggers)}")

    def write_stream(
        self,
        data_frame: DataFrame | None = None,
        location: str | None = None,
        format: str = "delta",
        checkpoint_location: str | None = None,
        partition_cols: list[str] | None = None,
        mode: str = "append",
        trigger_dict: dict | None = None,
        options: dict[str, Any] | None = None,
        await_termination: bool = False,
        **_: Any,
    ):
        """Writes a dataframe to specified location in specified format as a stream.

        Args:
            data_frame: The DataFrame to write.
            location: The location to write the DataFrame to.
            format: The format to write the DataFrame in.
            checkpoint_location: Location of checkpoint. If None, defaults
                to the location of the table being written, with '_checkpoint_'
                added before the name.
            partition_cols: Columns to partition by.
            mode: The write mode.
            trigger_dict: A dictionary specifying the trigger configuration for the streaming query.
                Supported keys include:

                - "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing.
                - "once": Processes all available data once and then stops.
                - "continuous": Specifies a time interval (e.g., "1 second") for continuous processing.
                - "availableNow": Processes all available data immediately and then stops.

                If nothing is provided, the default is {"availableNow": True}.
            options: Additional options for writing.
            await_termination: If True, the function will wait for the streaming
                query to finish before returning. This is useful for ensuring that
                the data has been fully written before proceeding with other
                operations.
        """
        if not location or not data_frame:
            raise ValueError("Location and data_frame are required for streaming.")

        self._log_operation(location, "start")
        try:
            options = options or {}
            trigger_dict = trigger_dict or {"availableNow": True}
            checkpoint_location = self._get_checkpoint_location(location, checkpoint_location)
            self._validate_trigger(trigger_dict)
            stream_writer = self._get_stream_writer(data_frame)

            stream_writer.trigger(**trigger_dict)
            stream_writer.format(format)
            stream_writer.outputMode(mode)
            stream_writer.options(**options).option("checkpointLocation", checkpoint_location)
            if partition_cols:
                stream_writer.partitionBy(partition_cols)

            query = stream_writer.start(location)
            if await_termination is True:
                query.awaitTermination()
        except Exception as e:
            self._log_operation(location, "failed", str(e))
            raise e
        else:
            self._log_operation(location, "succeeded")

    def write(
        self,
        data_frame: DataFrame,
        location: str | None = None,
        format: str = "delta",
        partition_cols: list[str] | None = None,
        mode: str = "append",
        options: dict[str, Any] | None = None,
        **_: Any,
    ):
        """Writes a dataframe to specified location in specified format."""
        if not location:
            raise ValueError("Location is required for writing to file.")

        self._log_operation(location, "start")
        try:
            options = options or {}
            df_writer = self._get_writer(data_frame)
            df_writer.format(format)
            df_writer.mode(mode)
            if partition_cols:
                df_writer.partitionBy(partition_cols)
            df_writer.options(**options)
            df_writer.save(str(location))
        except Exception as e:
            self._log_operation(location, "failed", str(e))
            raise e
        else:
            self._log_operation(location, "succeeded")

_get_stream_writer(df)

Returns a DataStreamWriter.

Source code in src/cloe_nessy/integration/writer/file_writer.py
def _get_stream_writer(self, df: DataFrame) -> DataStreamWriter:
    """Returns a DataStreamWriter."""
    return df.writeStream

_get_writer(df)

Returns a DataFrameWriter.

Source code in src/cloe_nessy/integration/writer/file_writer.py
def _get_writer(self, df: DataFrame) -> DataFrameWriter:
    """Returns a DataFrameWriter."""
    return df.write

_log_operation(location, status, error=None)

Logs the status of an operation.

Source code in src/cloe_nessy/integration/writer/file_writer.py
def _log_operation(self, location: str, status: str, error: str | None = None):
    """Logs the status of an operation."""
    if status == "start":
        self._console_logger.info(f"Starting to write to {location}")
    elif status == "succeeded":
        self._console_logger.info(f"Successfully wrote to {location}")
    elif status == "failed":
        self._console_logger.error(f"Failed to write to {location}: {error}")

_validate_trigger(trigger_dict)

Validates the trigger type.

Source code in src/cloe_nessy/integration/writer/file_writer.py
def _validate_trigger(self, trigger_dict: dict[str, Any]):
    """Validates the trigger type."""
    triggers = ["processingTime", "once", "continuous", "availableNow"]
    if not any(trigger in trigger_dict for trigger in triggers):
        raise ValueError(f"Invalid trigger type. Supported types are: {', '.join(triggers)}")

write(data_frame, location=None, format='delta', partition_cols=None, mode='append', options=None, **_)

Writes a dataframe to specified location in specified format.

Source code in src/cloe_nessy/integration/writer/file_writer.py
def write(
    self,
    data_frame: DataFrame,
    location: str | None = None,
    format: str = "delta",
    partition_cols: list[str] | None = None,
    mode: str = "append",
    options: dict[str, Any] | None = None,
    **_: Any,
):
    """Writes a dataframe to specified location in specified format."""
    if not location:
        raise ValueError("Location is required for writing to file.")

    self._log_operation(location, "start")
    try:
        options = options or {}
        df_writer = self._get_writer(data_frame)
        df_writer.format(format)
        df_writer.mode(mode)
        if partition_cols:
            df_writer.partitionBy(partition_cols)
        df_writer.options(**options)
        df_writer.save(str(location))
    except Exception as e:
        self._log_operation(location, "failed", str(e))
        raise e
    else:
        self._log_operation(location, "succeeded")

write_stream(data_frame=None, location=None, format='delta', checkpoint_location=None, partition_cols=None, mode='append', trigger_dict=None, options=None, await_termination=False, **_)

Writes a dataframe to specified location in specified format as a stream.

Parameters:

Name Type Description Default
data_frame DataFrame | None

The DataFrame to write.

None
location str | None

The location to write the DataFrame to.

None
format str

The format to write the DataFrame in.

'delta'
checkpoint_location str | None

Location of checkpoint. If None, defaults to the location of the table being written, with 'checkpoint' added before the name.

None
partition_cols list[str] | None

Columns to partition by.

None
mode str

The write mode.

'append'
trigger_dict dict | None

A dictionary specifying the trigger configuration for the streaming query. Supported keys include:

  • "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing.
  • "once": Processes all available data once and then stops.
  • "continuous": Specifies a time interval (e.g., "1 second") for continuous processing.
  • "availableNow": Processes all available data immediately and then stops.

If nothing is provided, the default is {"availableNow": True}.

None
options dict[str, Any] | None

Additional options for writing.

None
await_termination bool

If True, the function will wait for the streaming query to finish before returning. This is useful for ensuring that the data has been fully written before proceeding with other operations.

False
Source code in src/cloe_nessy/integration/writer/file_writer.py
def write_stream(
    self,
    data_frame: DataFrame | None = None,
    location: str | None = None,
    format: str = "delta",
    checkpoint_location: str | None = None,
    partition_cols: list[str] | None = None,
    mode: str = "append",
    trigger_dict: dict | None = None,
    options: dict[str, Any] | None = None,
    await_termination: bool = False,
    **_: Any,
):
    """Writes a dataframe to specified location in specified format as a stream.

    Args:
        data_frame: The DataFrame to write.
        location: The location to write the DataFrame to.
        format: The format to write the DataFrame in.
        checkpoint_location: Location of checkpoint. If None, defaults
            to the location of the table being written, with '_checkpoint_'
            added before the name.
        partition_cols: Columns to partition by.
        mode: The write mode.
        trigger_dict: A dictionary specifying the trigger configuration for the streaming query.
            Supported keys include:

            - "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing.
            - "once": Processes all available data once and then stops.
            - "continuous": Specifies a time interval (e.g., "1 second") for continuous processing.
            - "availableNow": Processes all available data immediately and then stops.

            If nothing is provided, the default is {"availableNow": True}.
        options: Additional options for writing.
        await_termination: If True, the function will wait for the streaming
            query to finish before returning. This is useful for ensuring that
            the data has been fully written before proceeding with other
            operations.
    """
    if not location or not data_frame:
        raise ValueError("Location and data_frame are required for streaming.")

    self._log_operation(location, "start")
    try:
        options = options or {}
        trigger_dict = trigger_dict or {"availableNow": True}
        checkpoint_location = self._get_checkpoint_location(location, checkpoint_location)
        self._validate_trigger(trigger_dict)
        stream_writer = self._get_stream_writer(data_frame)

        stream_writer.trigger(**trigger_dict)
        stream_writer.format(format)
        stream_writer.outputMode(mode)
        stream_writer.options(**options).option("checkpointLocation", checkpoint_location)
        if partition_cols:
            stream_writer.partitionBy(partition_cols)

        query = stream_writer.start(location)
        if await_termination is True:
            query.awaitTermination()
    except Exception as e:
        self._log_operation(location, "failed", str(e))
        raise e
    else:
        self._log_operation(location, "succeeded")