Skip to content

Index

ReadAPIAction

Bases: PipelineAction

Reads data from an API and loads it into a Spark DataFrame.

This method uses the provided API parameters to make a request using the APIReader and return a DataFrame containing the response data.

Example
Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: my/endpoint/
Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: my/endpoint/
        method: GET
        timeout: 90
        headers:
            key1: value1
            key2: value2
        params:
            key1: value1
            key2: value2
Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: my/endpoint/
        method: GET
        timeout: 90
        auth:
            - type: basic
              username: my_username
              password: my_password
            - type: secret_scope
              secret_scope: my_secret_scope
              header_template:
                "header_key_1": "<ENVIRONMENT_VARIABLE_NAME>"
            - type: secret_scope
              secret_scope: my_secret_scope
              header_template:
                "header_key_2": "<SECRET_NAME>"
            - type: secret_scope
              secret_scope: my_other_secret_scope
              header_template:
                "header_key_3": "<SECRET_NAME>"
            - type: azure_oauth
              client_id: my_client_id
              client_secret: my_client_secret
              tenant_id: my_tenant_id
              scope: <entra-id-client-id>

The above example will combine the headers from the different auth types. The resulting header will look like this:

{
    "header_key_1": "value_from_environment_variable",
    "header_key_2": "value_from_secret",
    "header_key_3": "value_from_secret",
    "Authorization": "Bearer <access_token> (from azure_oauth)",
    "Authorization": "Basic am9obkBleGFtcGxlLmNvbTphYmMxMjM= (from basic)"
}

Secret information

Don't write sensitive information like passwords or tokens directly in the pipeline configuration. Use secret scopes or environment variables instead.

Source code in src/cloe_nessy/pipeline/actions/read_api.py
class ReadAPIAction(PipelineAction):
    """Reads data from an API and loads it into a Spark DataFrame.

    This method uses the provided API parameters to make a request using the
    [`APIReader`][cloe_nessy.integration.reader.api_reader] and return a
    DataFrame containing the response data.

    Example:
        === "Basic Usage"
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: my/endpoint/
            ```
        === "Usage with Parameters and Headers"
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: my/endpoint/
                    method: GET
                    timeout: 90
                    headers:
                        key1: value1
                        key2: value2
                    params:
                        key1: value1
                        key2: value2
            ```
        === "Usage with Authentication"
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: my/endpoint/
                    method: GET
                    timeout: 90
                    auth:
                        - type: basic
                          username: my_username
                          password: my_password
                        - type: secret_scope
                          secret_scope: my_secret_scope
                          header_template:
                            "header_key_1": "<ENVIRONMENT_VARIABLE_NAME>"
                        - type: secret_scope
                          secret_scope: my_secret_scope
                          header_template:
                            "header_key_2": "<SECRET_NAME>"
                        - type: secret_scope
                          secret_scope: my_other_secret_scope
                          header_template:
                            "header_key_3": "<SECRET_NAME>"
                        - type: azure_oauth
                          client_id: my_client_id
                          client_secret: my_client_secret
                          tenant_id: my_tenant_id
                          scope: <entra-id-client-id>
            ```

            The above example will combine the headers from the different auth types. The resulting header will look like this:

            ```json
            {
                "header_key_1": "value_from_environment_variable",
                "header_key_2": "value_from_secret",
                "header_key_3": "value_from_secret",
                "Authorization": "Bearer <access_token> (from azure_oauth)",
                "Authorization": "Basic am9obkBleGFtcGxlLmNvbTphYmMxMjM= (from basic)"
            }
            ```

    !!! warning "Secret information"
        Don't write sensitive information like passwords or tokens directly in the pipeline configuration.
        Use secret scopes or environment variables instead.
    """

    name: str = "READ_API"

    @staticmethod
    def run(
        context: PipelineContext,
        *,
        base_url: str | None = None,
        auth: AuthBase | dict[str, str] | None = None,
        default_headers: dict[str, str] | None = None,
        endpoint: str = "",  # www.neo4j.de/api/table/2020/01/01
        method: str = "GET",
        key: str | None = None,
        timeout: int = 30,
        params: dict[str, str] | None = None,
        headers: dict[str, str] | None = None,
        data: dict[str, str] | None = None,
        json: dict[str, str] | None = None,
        max_retries: int = 0,
        options: dict[str, str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Utility class for reading an API into a DataFrame.

        This class uses an APIClient to fetch data from an API and load it into a Spark DataFrame.


        Args:
            context: The pipeline context containing information about the pipeline.
            base_url: The base URL for the API to be called.
            auth: The authentication credentials for the API.
            default_headers: Default headers to include in the API request.
            endpoint: The specific API endpoint to call.
            method: The HTTP method to use for the request (default is "GET").
            key: Key for accessing specific data in the response.
            timeout: Timeout for the API request in seconds (default is 30).
            params: URL parameters to include in the API request.
            headers: Additional headers to include in the request.
            data: Data to send with the request for POST methods.
            json: JSON data to send with the request for POST methods.
            max_retries: Maximum number of retries for the API request (default is 0).
            options: Additional options for the API request.

        Returns:
            The updated pipeline context containing the DataFrame with the API response data.

        Raises:
            ValueError: If the base_url is not specified.
        """
        if not options:
            options = dict()

        if base_url is None:
            raise ValueError("base_url must be specified to fetch data from API.")

        deserialized_auth = process_auth(auth)

        api_reader = APIReader(base_url=base_url, auth=deserialized_auth, default_headers=default_headers)

        df = api_reader.read(
            method=method,
            endpoint=endpoint,
            timeout=timeout,
            params=params,
            key=key,
            headers=headers,
            data=data,
            json=json,
            max_retries=max_retries,
            options=options,
        )

        return context.from_existing(data=df)

run(context, *, base_url=None, auth=None, default_headers=None, endpoint='', method='GET', key=None, timeout=30, params=None, headers=None, data=None, json=None, max_retries=0, options=None, **_) staticmethod

Utility class for reading an API into a DataFrame.

This class uses an APIClient to fetch data from an API and load it into a Spark DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

The pipeline context containing information about the pipeline.

required
base_url str | None

The base URL for the API to be called.

None
auth AuthBase | dict[str, str] | None

The authentication credentials for the API.

None
default_headers dict[str, str] | None

Default headers to include in the API request.

None
endpoint str

The specific API endpoint to call.

''
method str

The HTTP method to use for the request (default is "GET").

'GET'
key str | None

Key for accessing specific data in the response.

None
timeout int

Timeout for the API request in seconds (default is 30).

30
params dict[str, str] | None

URL parameters to include in the API request.

None
headers dict[str, str] | None

Additional headers to include in the request.

None
data dict[str, str] | None

Data to send with the request for POST methods.

None
json dict[str, str] | None

JSON data to send with the request for POST methods.

None
max_retries int

Maximum number of retries for the API request (default is 0).

0
options dict[str, str] | None

Additional options for the API request.

None

Returns:

Type Description
PipelineContext

The updated pipeline context containing the DataFrame with the API response data.

Raises:

Type Description
ValueError

If the base_url is not specified.

Source code in src/cloe_nessy/pipeline/actions/read_api.py
@staticmethod
def run(
    context: PipelineContext,
    *,
    base_url: str | None = None,
    auth: AuthBase | dict[str, str] | None = None,
    default_headers: dict[str, str] | None = None,
    endpoint: str = "",  # www.neo4j.de/api/table/2020/01/01
    method: str = "GET",
    key: str | None = None,
    timeout: int = 30,
    params: dict[str, str] | None = None,
    headers: dict[str, str] | None = None,
    data: dict[str, str] | None = None,
    json: dict[str, str] | None = None,
    max_retries: int = 0,
    options: dict[str, str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Utility class for reading an API into a DataFrame.

    This class uses an APIClient to fetch data from an API and load it into a Spark DataFrame.


    Args:
        context: The pipeline context containing information about the pipeline.
        base_url: The base URL for the API to be called.
        auth: The authentication credentials for the API.
        default_headers: Default headers to include in the API request.
        endpoint: The specific API endpoint to call.
        method: The HTTP method to use for the request (default is "GET").
        key: Key for accessing specific data in the response.
        timeout: Timeout for the API request in seconds (default is 30).
        params: URL parameters to include in the API request.
        headers: Additional headers to include in the request.
        data: Data to send with the request for POST methods.
        json: JSON data to send with the request for POST methods.
        max_retries: Maximum number of retries for the API request (default is 0).
        options: Additional options for the API request.

    Returns:
        The updated pipeline context containing the DataFrame with the API response data.

    Raises:
        ValueError: If the base_url is not specified.
    """
    if not options:
        options = dict()

    if base_url is None:
        raise ValueError("base_url must be specified to fetch data from API.")

    deserialized_auth = process_auth(auth)

    api_reader = APIReader(base_url=base_url, auth=deserialized_auth, default_headers=default_headers)

    df = api_reader.read(
        method=method,
        endpoint=endpoint,
        timeout=timeout,
        params=params,
        key=key,
        headers=headers,
        data=data,
        json=json,
        max_retries=max_retries,
        options=options,
    )

    return context.from_existing(data=df)

ReadCatalogTableAction

Bases: PipelineAction

Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

This function retrieves data from a catalog table using the CatalogReader identified by either the table_identifier parameter or the table_metadata from the provided PipelineContext of a previous step. The retrieved data is loaded into a DataFrame and returned as part of an updated PipelineContext.

Example
Read Sales Table:
    action: READ_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        options: <options for the CatalogReader read method>
    delta_load_options:
        strategy: CDF
        delta_load_identifier: my_delta_load_id
        strategy_options:
            deduplication_columns: ["id"]
            enable_full_load: true
Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
class ReadCatalogTableAction(PipelineAction):
    """Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

    This function retrieves data from a catalog table using the
    [`CatalogReader`][cloe_nessy.integration.reader.catalog_reader] identified
    by either the `table_identifier` parameter or the `table_metadata` from the
    provided `PipelineContext` of a previous step. The retrieved data is loaded
    into a DataFrame and returned as part of an updated `PipelineContext`.

    Example:
        ```yaml
        Read Sales Table:
            action: READ_CATALOG_TABLE
            options:
                table_identifier: my_catalog.business_schema.sales_table
                options: <options for the CatalogReader read method>
            delta_load_options:
                strategy: CDF
                delta_load_identifier: my_delta_load_id
                strategy_options:
                    deduplication_columns: ["id"]
                    enable_full_load: true
        ```
    """

    name: str = "READ_CATALOG_TABLE"

    @staticmethod
    def run(
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        options: dict[str, str] | None = None,
        delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
        **_: Any,  # define kwargs to match the base class signature
    ) -> PipelineContext:
        """Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

        Args:
            context: The pipeline's context, which contains
                metadata and configuration for the action.
            table_identifier: The identifier of the catalog table to
                read. If not provided, the function will attempt to use the table
                identifier from the `table_metadata` in the `context`.
            options: A dictionary of options for customizing
                the [`CatalogReader`][cloe_nessy.integration.reader.catalog_reader]
                behavior, such as filters or reading modes. Defaults to None.
            delta_load_options: Options for delta loading, if applicable.
                Configures the [`DeltaLoader`][cloe_nessy.integration.delta_loader].

        Raises:
            ValueError: If neither `table_identifier` nor `table_metadata.identifier` in the `context` is provided.

        Returns:
        An updated pipeline context containing the data read from the catalog table as a DataFrame.
        """
        if not options:
            options = {}

        if not delta_load_options:
            delta_load_options = {}

        if (table_metadata := context.table_metadata) and table_identifier is None:
            table_identifier = table_metadata.identifier
        if table_identifier is None:
            raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

        if isinstance(delta_load_options, dict):
            delta_options_dict = delta_load_options
            if delta_load_options:
                delta_load_options = DeltaLoadOptions(**delta_load_options)
            else:
                delta_load_options = None
        else:
            delta_options_dict = delta_load_options.model_dump() if delta_load_options else {}

        runtime_info = set_delta_load_info(
            table_identifier=table_identifier,
            delta_load_options=delta_options_dict,
            runtime_info=context.runtime_info or {},
        )

        table_reader = CatalogReader()
        df = table_reader.read(
            table_identifier=table_identifier,
            options=options,
            delta_load_options=delta_load_options,
        )
        return context.from_existing(data=df, runtime_info=runtime_info)

run(context, *, table_identifier=None, options=None, delta_load_options=None, **_) staticmethod

Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

Parameters:

Name Type Description Default
context PipelineContext

The pipeline's context, which contains metadata and configuration for the action.

required
table_identifier str | None

The identifier of the catalog table to read. If not provided, the function will attempt to use the table identifier from the table_metadata in the context.

None
options dict[str, str] | None

A dictionary of options for customizing the CatalogReader behavior, such as filters or reading modes. Defaults to None.

None
delta_load_options dict[Any, Any] | DeltaLoadOptions | None

Options for delta loading, if applicable. Configures the DeltaLoader.

None

Raises:

Type Description
ValueError

If neither table_identifier nor table_metadata.identifier in the context is provided.

Returns: An updated pipeline context containing the data read from the catalog table as a DataFrame.

Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
@staticmethod
def run(
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    options: dict[str, str] | None = None,
    delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
    **_: Any,  # define kwargs to match the base class signature
) -> PipelineContext:
    """Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

    Args:
        context: The pipeline's context, which contains
            metadata and configuration for the action.
        table_identifier: The identifier of the catalog table to
            read. If not provided, the function will attempt to use the table
            identifier from the `table_metadata` in the `context`.
        options: A dictionary of options for customizing
            the [`CatalogReader`][cloe_nessy.integration.reader.catalog_reader]
            behavior, such as filters or reading modes. Defaults to None.
        delta_load_options: Options for delta loading, if applicable.
            Configures the [`DeltaLoader`][cloe_nessy.integration.delta_loader].

    Raises:
        ValueError: If neither `table_identifier` nor `table_metadata.identifier` in the `context` is provided.

    Returns:
    An updated pipeline context containing the data read from the catalog table as a DataFrame.
    """
    if not options:
        options = {}

    if not delta_load_options:
        delta_load_options = {}

    if (table_metadata := context.table_metadata) and table_identifier is None:
        table_identifier = table_metadata.identifier
    if table_identifier is None:
        raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

    if isinstance(delta_load_options, dict):
        delta_options_dict = delta_load_options
        if delta_load_options:
            delta_load_options = DeltaLoadOptions(**delta_load_options)
        else:
            delta_load_options = None
    else:
        delta_options_dict = delta_load_options.model_dump() if delta_load_options else {}

    runtime_info = set_delta_load_info(
        table_identifier=table_identifier,
        delta_load_options=delta_options_dict,
        runtime_info=context.runtime_info or {},
    )

    table_reader = CatalogReader()
    df = table_reader.read(
        table_identifier=table_identifier,
        options=options,
        delta_load_options=delta_load_options,
    )
    return context.from_existing(data=df, runtime_info=runtime_info)

ReadExcelAction

Bases: PipelineAction

Reads data from an Excel file or directory of Excel files and returns a DataFrame.

The function reads Excel files using the ExcelDataFrameReader either from a single file or a directory path. It can read specific sheets, handle file extensions, and offers various options to customize how the data is read, such as specifying headers, index columns, and handling missing values. The resulting data is returned as a DataFrame, and metadata about the read files can be included in the context.

Example
Read Excel Table:
    action: READ_EXCEL
    options:
        file: excel_file_folder/excel_files_june/interesting_excel_file.xlsx
        usecols:
            - key_column
            - interesting_column
        options: <options for the ExcelDataFrameReader read method>

More Options

The READ_EXCEL action supports additional options that can be passed to the run method. For more information, refer to the method documentation.

Source code in src/cloe_nessy/pipeline/actions/read_excel.py
class ReadExcelAction(PipelineAction):
    """Reads data from an Excel file or directory of Excel files and returns a DataFrame.

    The function reads Excel files using the
    [`ExcelDataFrameReader`][cloe_nessy.integration.reader.excel_reader] either
    from a single file or a directory path. It can read specific sheets, handle
    file extensions, and offers various options to customize how the data is
    read, such as specifying headers, index columns, and handling missing
    values. The resulting data is returned as a DataFrame, and metadata about
    the read files can be included in the context.

    Example:
        ```yaml
        Read Excel Table:
            action: READ_EXCEL
            options:
                file: excel_file_folder/excel_files_june/interesting_excel_file.xlsx
                usecols:
                    - key_column
                    - interesting_column
                options: <options for the ExcelDataFrameReader read method>
        ```

    !!! note "More Options"
        The `READ_EXCEL` action supports additional options that can be passed to the
        run method. For more information, refer to the method documentation.
    """

    name: str = "READ_EXCEL"

    def run(
        self,
        context: PipelineContext,
        *,
        file: str | None = None,
        path: str | None = None,
        extension: str = "xlsx",
        recursive: bool = False,
        sheet_name: str | int | list = 0,
        sheet_name_as_column: bool = False,
        header: int | list[int] = 0,
        index_col: int | list[int] | None = None,
        usecols: int | str | list | Callable | None = None,
        dtype: str | None = None,
        fillna: str | dict[str, list[str]] | dict[str, str] | None = None,
        true_values: list | None = None,
        false_values: list | None = None,
        nrows: int | None = None,
        na_values: list[str] | dict[str, list[str]] | None = None,
        keep_default_na: bool = True,
        parse_dates: bool | list | dict = False,
        date_parser: Callable | None = None,
        thousands: str | None = None,
        include_index: bool = False,
        options: dict | None = None,
        add_metadata_column: bool = True,
        load_as_strings: bool = False,
        **_,
    ) -> PipelineContext:
        """Reads data from an Excel file or directory of Excel files and returns a DataFrame.

        Args:
            context: The context in which the action is executed.
            file: The path to a single Excel file. Either `file` or `path` must be specified.
            path: The directory path containing multiple Excel files. Either `file` or `path` must be specified.
            extension: The file extension to look for when reading from a directory.
            recursive: Whether to include subdirectories when reading from a directory path.
            sheet_name: The sheet name(s) or index(es) to read from the Excel file.
            sheet_name_as_column: Whether to add a column with the sheet name to the DataFrame.
            header: Row number(s) to use as the column labels.
            index_col: Column(s) to use as the index of the DataFrame.
            usecols: Subset of columns to parse. Can be an integer, string, list,
                or function.
            dtype: Data type for the columns.
            fillna: Method or value to use to fill NaN values.
            true_values: Values to consider as True.
            false_values: Values to consider as False.
            nrows: Number of rows to parse.
            na_values: Additional strings to recognize as NaN/NA.
            keep_default_na: Whether to append default NaN values when custom `na_values` are specified.
            parse_dates: Options for parsing date columns.
            date_parser: Function to use for converting strings to datetime objects.
            thousands: Thousands separator to use when parsing numeric columns.
            include_index: Whether to include an index column in the output DataFrame.
            options: Additional options to pass to the DataFrame reader.
            add_metadata_column: Whether to add a metadata column with file information to the DataFrame.
            load_as_strings: Whether to load all columns as strings.

        Raises:
            ValueError: Raised if both `file` and `path` are specified, or if neither is provided.

        Returns:
            The updated context, with the read data as a DataFrame.
        """
        if not options:
            options = dict()

        if file is not None and path is not None:
            self._tabular_logger.error("message: Only one of file or path have to be specified.")
            raise ValueError("Only one of file or path have to be specified.")

        excel_reader = ExcelDataFrameReader()
        if file is not None:
            df = excel_reader.read(
                location=file,
                sheet_name=sheet_name,
                sheet_name_as_column=sheet_name_as_column,
                header=header,
                index_col=index_col,
                usecols=usecols,
                true_values=true_values,
                false_values=false_values,
                nrows=nrows,
                dtype=dtype,
                fillna=fillna,
                na_values=na_values,
                keep_default_na=keep_default_na,
                parse_dates=parse_dates,
                date_parser=date_parser,
                thousands=thousands,
                include_index=include_index,
                options=options,
                add_metadata_column=add_metadata_column,
                load_as_strings=load_as_strings,
            )
        elif path is not None:
            file_list = get_file_paths(path, extension, recursive)
            df_dict: dict = {}
            for path in file_list:
                df_dict[path] = excel_reader.read(
                    location=path,
                    sheet_name=sheet_name,
                    sheet_name_as_column=sheet_name_as_column,
                    header=header,
                    index_col=index_col,
                    usecols=usecols,
                    dtype=dtype,
                    fillna=fillna,
                    true_values=true_values,
                    false_values=false_values,
                    nrows=nrows,
                    na_values=na_values,
                    keep_default_na=keep_default_na,
                    parse_dates=parse_dates,
                    date_parser=date_parser,
                    thousands=thousands,
                    include_index=include_index,
                    options=options,
                    add_metadata_column=add_metadata_column,
                    load_as_strings=load_as_strings,
                )
            df = reduce(DataFrame.unionAll, list(df_dict.values()))

        else:
            self._tabular_logger.error("action_name: READ_EXCEL | message: Either file or path have to be specified.")
            raise ValueError("Either file or path have to be specified.")

        runtime_info = context.runtime_info

        if add_metadata_column:
            read_files_list = list(set([x.file_path for x in df.select("__metadata.file_path").collect()]))
            if runtime_info is None:
                runtime_info = {"read_files": read_files_list}
            else:
                try:
                    runtime_info["read_files"] = list(set(runtime_info["read_files"] + read_files_list))
                except KeyError:
                    runtime_info["read_files"] = read_files_list

        return context.from_existing(data=df)

run(context, *, file=None, path=None, extension='xlsx', recursive=False, sheet_name=0, sheet_name_as_column=False, header=0, index_col=None, usecols=None, dtype=None, fillna=None, true_values=None, false_values=None, nrows=None, na_values=None, keep_default_na=True, parse_dates=False, date_parser=None, thousands=None, include_index=False, options=None, add_metadata_column=True, load_as_strings=False, **_)

Reads data from an Excel file or directory of Excel files and returns a DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

The context in which the action is executed.

required
file str | None

The path to a single Excel file. Either file or path must be specified.

None
path str | None

The directory path containing multiple Excel files. Either file or path must be specified.

None
extension str

The file extension to look for when reading from a directory.

'xlsx'
recursive bool

Whether to include subdirectories when reading from a directory path.

False
sheet_name str | int | list

The sheet name(s) or index(es) to read from the Excel file.

0
sheet_name_as_column bool

Whether to add a column with the sheet name to the DataFrame.

False
header int | list[int]

Row number(s) to use as the column labels.

0
index_col int | list[int] | None

Column(s) to use as the index of the DataFrame.

None
usecols int | str | list | Callable | None

Subset of columns to parse. Can be an integer, string, list, or function.

None
dtype str | None

Data type for the columns.

None
fillna str | dict[str, list[str]] | dict[str, str] | None

Method or value to use to fill NaN values.

None
true_values list | None

Values to consider as True.

None
false_values list | None

Values to consider as False.

None
nrows int | None

Number of rows to parse.

None
na_values list[str] | dict[str, list[str]] | None

Additional strings to recognize as NaN/NA.

None
keep_default_na bool

Whether to append default NaN values when custom na_values are specified.

True
parse_dates bool | list | dict

Options for parsing date columns.

False
date_parser Callable | None

Function to use for converting strings to datetime objects.

None
thousands str | None

Thousands separator to use when parsing numeric columns.

None
include_index bool

Whether to include an index column in the output DataFrame.

False
options dict | None

Additional options to pass to the DataFrame reader.

None
add_metadata_column bool

Whether to add a metadata column with file information to the DataFrame.

True
load_as_strings bool

Whether to load all columns as strings.

False

Raises:

Type Description
ValueError

Raised if both file and path are specified, or if neither is provided.

Returns:

Type Description
PipelineContext

The updated context, with the read data as a DataFrame.

Source code in src/cloe_nessy/pipeline/actions/read_excel.py
def run(
    self,
    context: PipelineContext,
    *,
    file: str | None = None,
    path: str | None = None,
    extension: str = "xlsx",
    recursive: bool = False,
    sheet_name: str | int | list = 0,
    sheet_name_as_column: bool = False,
    header: int | list[int] = 0,
    index_col: int | list[int] | None = None,
    usecols: int | str | list | Callable | None = None,
    dtype: str | None = None,
    fillna: str | dict[str, list[str]] | dict[str, str] | None = None,
    true_values: list | None = None,
    false_values: list | None = None,
    nrows: int | None = None,
    na_values: list[str] | dict[str, list[str]] | None = None,
    keep_default_na: bool = True,
    parse_dates: bool | list | dict = False,
    date_parser: Callable | None = None,
    thousands: str | None = None,
    include_index: bool = False,
    options: dict | None = None,
    add_metadata_column: bool = True,
    load_as_strings: bool = False,
    **_,
) -> PipelineContext:
    """Reads data from an Excel file or directory of Excel files and returns a DataFrame.

    Args:
        context: The context in which the action is executed.
        file: The path to a single Excel file. Either `file` or `path` must be specified.
        path: The directory path containing multiple Excel files. Either `file` or `path` must be specified.
        extension: The file extension to look for when reading from a directory.
        recursive: Whether to include subdirectories when reading from a directory path.
        sheet_name: The sheet name(s) or index(es) to read from the Excel file.
        sheet_name_as_column: Whether to add a column with the sheet name to the DataFrame.
        header: Row number(s) to use as the column labels.
        index_col: Column(s) to use as the index of the DataFrame.
        usecols: Subset of columns to parse. Can be an integer, string, list,
            or function.
        dtype: Data type for the columns.
        fillna: Method or value to use to fill NaN values.
        true_values: Values to consider as True.
        false_values: Values to consider as False.
        nrows: Number of rows to parse.
        na_values: Additional strings to recognize as NaN/NA.
        keep_default_na: Whether to append default NaN values when custom `na_values` are specified.
        parse_dates: Options for parsing date columns.
        date_parser: Function to use for converting strings to datetime objects.
        thousands: Thousands separator to use when parsing numeric columns.
        include_index: Whether to include an index column in the output DataFrame.
        options: Additional options to pass to the DataFrame reader.
        add_metadata_column: Whether to add a metadata column with file information to the DataFrame.
        load_as_strings: Whether to load all columns as strings.

    Raises:
        ValueError: Raised if both `file` and `path` are specified, or if neither is provided.

    Returns:
        The updated context, with the read data as a DataFrame.
    """
    if not options:
        options = dict()

    if file is not None and path is not None:
        self._tabular_logger.error("message: Only one of file or path have to be specified.")
        raise ValueError("Only one of file or path have to be specified.")

    excel_reader = ExcelDataFrameReader()
    if file is not None:
        df = excel_reader.read(
            location=file,
            sheet_name=sheet_name,
            sheet_name_as_column=sheet_name_as_column,
            header=header,
            index_col=index_col,
            usecols=usecols,
            true_values=true_values,
            false_values=false_values,
            nrows=nrows,
            dtype=dtype,
            fillna=fillna,
            na_values=na_values,
            keep_default_na=keep_default_na,
            parse_dates=parse_dates,
            date_parser=date_parser,
            thousands=thousands,
            include_index=include_index,
            options=options,
            add_metadata_column=add_metadata_column,
            load_as_strings=load_as_strings,
        )
    elif path is not None:
        file_list = get_file_paths(path, extension, recursive)
        df_dict: dict = {}
        for path in file_list:
            df_dict[path] = excel_reader.read(
                location=path,
                sheet_name=sheet_name,
                sheet_name_as_column=sheet_name_as_column,
                header=header,
                index_col=index_col,
                usecols=usecols,
                dtype=dtype,
                fillna=fillna,
                true_values=true_values,
                false_values=false_values,
                nrows=nrows,
                na_values=na_values,
                keep_default_na=keep_default_na,
                parse_dates=parse_dates,
                date_parser=date_parser,
                thousands=thousands,
                include_index=include_index,
                options=options,
                add_metadata_column=add_metadata_column,
                load_as_strings=load_as_strings,
            )
        df = reduce(DataFrame.unionAll, list(df_dict.values()))

    else:
        self._tabular_logger.error("action_name: READ_EXCEL | message: Either file or path have to be specified.")
        raise ValueError("Either file or path have to be specified.")

    runtime_info = context.runtime_info

    if add_metadata_column:
        read_files_list = list(set([x.file_path for x in df.select("__metadata.file_path").collect()]))
        if runtime_info is None:
            runtime_info = {"read_files": read_files_list}
        else:
            try:
                runtime_info["read_files"] = list(set(runtime_info["read_files"] + read_files_list))
            except KeyError:
                runtime_info["read_files"] = read_files_list

    return context.from_existing(data=df)

ReadFilesAction

Bases: PipelineAction

Reads files from a specified location.

If an extension is provided, all files with the given extension will be read using the FileReader. If no extension is provided, the spark_format must be set, and all files in the location will be read using a DataFrameReader with the specified format.

Example
Read Files:
    action: READ_FILES
    options:
        location: json_file_folder/
        search_subdirs: True
        spark_format: JSON

Define Spark Format

Use the spark_format option to specify the format with which to read the files. Supported formats are e.g., CSV, JSON, PARQUET, TEXT, and XML.

Read Files:
    action: READ_FILES
    options:
        location: csv_file_folder/
        search_subdirs: True
        extension: csv

Define Extension

Use the extension option to specify the extension of the files to read. If not specified, the spark_format will be derived from the extension.

Read Files:
    action: READ_FILES
    options:
        location: file_folder/
        extension: abc_custom_extension  # specifies the files to read
        spark_format: CSV  # specifies the format to read the files with

Define both Extension & Spark Format

Use the extension option to specify the extension of the files to read. Additionally, use the spark_format option to specify the format with which to read the files.

Read Delta Files:
    action: READ_FILES
    options:
        location: /path/to/delta/table
        spark_format: delta
    delta_load_options:
        strategy: CDF
        delta_load_identifier: my_delta_files_load
        strategy_options:
            deduplication_columns: ["id"]
            enable_full_load: false

Delta Loading for Files

Use delta_load_options when reading Delta Lake tables to enable incremental loading. This works with both CDF and timestamp strategies.

Source code in src/cloe_nessy/pipeline/actions/read_files.py
class ReadFilesAction(PipelineAction):
    """Reads files from a specified location.

    If an extension is provided, all files with the given extension will be read
    using the [`FileReader`][cloe_nessy.integration.reader.file_reader]. If no
    extension is provided, the `spark_format` must be set, and all files in the
    location will be read using a DataFrameReader with the specified format.

    Example:
        === "Read files specified by spark_format"
            ```yaml
            Read Files:
                action: READ_FILES
                options:
                    location: json_file_folder/
                    search_subdirs: True
                    spark_format: JSON
            ```
            !!! note "Define Spark Format"
                Use the `spark_format` option to specify the format with which
                to read the files. Supported formats are e.g., `CSV`, `JSON`,
                `PARQUET`, `TEXT`, and `XML`.

        === "Read files specified by extension"
            ```yaml
            Read Files:
                action: READ_FILES
                options:
                    location: csv_file_folder/
                    search_subdirs: True
                    extension: csv
            ```
            !!! note "Define Extension"
                Use the `extension` option to specify the extension of the files
                to read. If not specified, the `spark_format` will be derived from
                the extension.

        === "Read files with a specified spark_format AND extension"
            ```yaml
            Read Files:
                action: READ_FILES
                options:
                    location: file_folder/
                    extension: abc_custom_extension  # specifies the files to read
                    spark_format: CSV  # specifies the format to read the files with
            ```
            !!! note "Define both Extension & Spark Format"
                Use the `extension` option to specify the extension of the files
                to read. Additionally, use the `spark_format` option to specify
                the format with which to read the files.

        === "Read Delta Lake table with delta loading"
            ```yaml
            Read Delta Files:
                action: READ_FILES
                options:
                    location: /path/to/delta/table
                    spark_format: delta
                delta_load_options:
                    strategy: CDF
                    delta_load_identifier: my_delta_files_load
                    strategy_options:
                        deduplication_columns: ["id"]
                        enable_full_load: false
            ```
            !!! note "Delta Loading for Files"
                Use `delta_load_options` when reading Delta Lake tables to enable
                incremental loading. This works with both CDF and timestamp strategies.
    """

    name: str = "READ_FILES"

    @staticmethod
    def run(
        context: PipelineContext,
        *,
        location: str | None = None,
        search_subdirs: bool = False,
        extension: str | None = None,
        spark_format: str | None = None,
        schema: str | None = None,
        add_metadata_column: bool = True,
        options: dict[str, str] | None = None,
        delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Reads files from a specified location.

        Args:
            context: The context in which this Action is executed.
            location: The location from which to read files.
            search_subdirs: Recursively search subdirectories for files
                if an extension is provided.
            extension: The file extension to filter files by.
            spark_format: The format to use for reading the files. If not provided,
                it will be deferred from the file extension.
            schema: The schema of the data. If None, schema is obtained from
                the context metadata.
            add_metadata_column: Whether to include the `__metadata` column with
                file metadata in the DataFrame.
            options: Additional options passed to the reader.
            delta_load_options: Options for delta loading, if applicable. When provided
                for Delta format files, enables incremental loading using delta loader strategies.

        Raises:
            ValueError: If neither `extension` nor `spark_format` are provided, or if
                no location is specified.

        Returns:
            The context after the Action has been executed, containing the read data as a DataFrame.
        """
        if not location:
            raise ValueError("No location provided. Please specify location to read files from.")
        if not options:
            options = dict()
        if not spark_format and not extension:
            raise ValueError("Either spark_format or extension must be provided.")

        if (metadata := context.table_metadata) and schema is None:
            schema = metadata.schema

        # Convert dict to DeltaLoadOptions if needed
        if isinstance(delta_load_options, dict):
            delta_load_options = DeltaLoadOptions(**delta_load_options)

        # Set up runtime info for delta loading
        runtime_info = context.runtime_info or {}
        if delta_load_options:
            # Convert DeltaLoadOptions to dict for runtime info storage
            delta_options_dict = (
                delta_load_options.model_dump()
                if isinstance(delta_load_options, DeltaLoadOptions)
                else delta_load_options
            )
            runtime_info = set_delta_load_info(
                table_identifier=location,  # Use location as identifier for file-based delta loading
                delta_load_options=delta_options_dict,
                runtime_info=runtime_info,
            )

        file_reader = FileReader()
        df = file_reader.read(
            location=location,
            schema=schema,
            extension=extension,
            spark_format=spark_format,
            search_subdirs=search_subdirs,
            options=options,
            add_metadata_column=add_metadata_column,
            delta_load_options=delta_load_options,
        )

        # Only process metadata column if it exists and wasn't using delta loading
        if add_metadata_column and "__metadata" in df.columns:
            read_files_list = [x.file_path for x in df.select("__metadata.file_path").drop_duplicates().collect()]
            if runtime_info is None:
                runtime_info = {"read_files": read_files_list}
            else:
                try:
                    runtime_info["read_files"] = list(set(runtime_info["read_files"] + read_files_list))
                except KeyError:
                    runtime_info["read_files"] = read_files_list

        return context.from_existing(data=df, runtime_info=runtime_info)

run(context, *, location=None, search_subdirs=False, extension=None, spark_format=None, schema=None, add_metadata_column=True, options=None, delta_load_options=None, **_) staticmethod

Reads files from a specified location.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
location str | None

The location from which to read files.

None
search_subdirs bool

Recursively search subdirectories for files if an extension is provided.

False
extension str | None

The file extension to filter files by.

None
spark_format str | None

The format to use for reading the files. If not provided, it will be deferred from the file extension.

None
schema str | None

The schema of the data. If None, schema is obtained from the context metadata.

None
add_metadata_column bool

Whether to include the __metadata column with file metadata in the DataFrame.

True
options dict[str, str] | None

Additional options passed to the reader.

None
delta_load_options dict[Any, Any] | DeltaLoadOptions | None

Options for delta loading, if applicable. When provided for Delta format files, enables incremental loading using delta loader strategies.

None

Raises:

Type Description
ValueError

If neither extension nor spark_format are provided, or if no location is specified.

Returns:

Type Description
PipelineContext

The context after the Action has been executed, containing the read data as a DataFrame.

Source code in src/cloe_nessy/pipeline/actions/read_files.py
@staticmethod
def run(
    context: PipelineContext,
    *,
    location: str | None = None,
    search_subdirs: bool = False,
    extension: str | None = None,
    spark_format: str | None = None,
    schema: str | None = None,
    add_metadata_column: bool = True,
    options: dict[str, str] | None = None,
    delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
    **_: Any,
) -> PipelineContext:
    """Reads files from a specified location.

    Args:
        context: The context in which this Action is executed.
        location: The location from which to read files.
        search_subdirs: Recursively search subdirectories for files
            if an extension is provided.
        extension: The file extension to filter files by.
        spark_format: The format to use for reading the files. If not provided,
            it will be deferred from the file extension.
        schema: The schema of the data. If None, schema is obtained from
            the context metadata.
        add_metadata_column: Whether to include the `__metadata` column with
            file metadata in the DataFrame.
        options: Additional options passed to the reader.
        delta_load_options: Options for delta loading, if applicable. When provided
            for Delta format files, enables incremental loading using delta loader strategies.

    Raises:
        ValueError: If neither `extension` nor `spark_format` are provided, or if
            no location is specified.

    Returns:
        The context after the Action has been executed, containing the read data as a DataFrame.
    """
    if not location:
        raise ValueError("No location provided. Please specify location to read files from.")
    if not options:
        options = dict()
    if not spark_format and not extension:
        raise ValueError("Either spark_format or extension must be provided.")

    if (metadata := context.table_metadata) and schema is None:
        schema = metadata.schema

    # Convert dict to DeltaLoadOptions if needed
    if isinstance(delta_load_options, dict):
        delta_load_options = DeltaLoadOptions(**delta_load_options)

    # Set up runtime info for delta loading
    runtime_info = context.runtime_info or {}
    if delta_load_options:
        # Convert DeltaLoadOptions to dict for runtime info storage
        delta_options_dict = (
            delta_load_options.model_dump()
            if isinstance(delta_load_options, DeltaLoadOptions)
            else delta_load_options
        )
        runtime_info = set_delta_load_info(
            table_identifier=location,  # Use location as identifier for file-based delta loading
            delta_load_options=delta_options_dict,
            runtime_info=runtime_info,
        )

    file_reader = FileReader()
    df = file_reader.read(
        location=location,
        schema=schema,
        extension=extension,
        spark_format=spark_format,
        search_subdirs=search_subdirs,
        options=options,
        add_metadata_column=add_metadata_column,
        delta_load_options=delta_load_options,
    )

    # Only process metadata column if it exists and wasn't using delta loading
    if add_metadata_column and "__metadata" in df.columns:
        read_files_list = [x.file_path for x in df.select("__metadata.file_path").drop_duplicates().collect()]
        if runtime_info is None:
            runtime_info = {"read_files": read_files_list}
        else:
            try:
                runtime_info["read_files"] = list(set(runtime_info["read_files"] + read_files_list))
            except KeyError:
                runtime_info["read_files"] = read_files_list

    return context.from_existing(data=df, runtime_info=runtime_info)

ReadMetadataYAMLAction

Bases: PipelineAction

Reads schema metadata from a yaml file using the Schema model.

Example
Read Schema Metadata:
    action: READ_METADATA_YAML_ACTION
    options:
        path: excel_file_folder/excel_files_june/
        file_name: sales_schema.yml
        table_name: sales
Source code in src/cloe_nessy/pipeline/actions/read_metadata_yaml.py
class ReadMetadataYAMLAction(PipelineAction):
    """Reads schema metadata from a yaml file using the [`Schema`][cloe_nessy.models.schema] model.

    Example:
        ```yaml
        Read Schema Metadata:
            action: READ_METADATA_YAML_ACTION
            options:
                path: excel_file_folder/excel_files_june/
                file_name: sales_schema.yml
                table_name: sales
        ```
    """

    name: str = "READ_METADATA_YAML_ACTION"

    @staticmethod
    def run(
        context: PipelineContext,
        *,
        path: str | None = None,
        file_name: str | None = None,
        table_name: str | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Reads schema metadata from a yaml file using the [`Schema`][cloe_nessy.models.schema] model.

        Args:
            context: The context in which this Action is executed.
            path: The path to the data contract directory.
            file_name: The name of the file that defines the schema.
            table_name: The name of the table for which to retrieve metadata.

        Raises:
            ValueError: If any issues occur while reading the schema, such as an invalid schema,
                missing file, or missing path.

        Returns:
            The context after the execution of this Action, containing the table metadata.
        """
        if not path:
            raise ValueError("No path provided. Please specify path to schema metadata.")
        if not file_name:
            raise ValueError("No file_name provided. Please specify file name.")
        if not table_name:
            raise ValueError("No table_name provided. Please specify table name.")

        path_obj = pathlib.Path(path)

        schema, errors = Schema.read_instance_from_file(path_obj / file_name)
        if errors:
            raise ValueError(f"Errors while reading schema metadata: {errors}")
        if not schema:
            raise ValueError("No schema found in metadata.")

        table = schema.get_table_by_name(table_name=table_name)

        return context.from_existing(table_metadata=table)

run(context, *, path=None, file_name=None, table_name=None, **_) staticmethod

Reads schema metadata from a yaml file using the Schema model.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
path str | None

The path to the data contract directory.

None
file_name str | None

The name of the file that defines the schema.

None
table_name str | None

The name of the table for which to retrieve metadata.

None

Raises:

Type Description
ValueError

If any issues occur while reading the schema, such as an invalid schema, missing file, or missing path.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the table metadata.

Source code in src/cloe_nessy/pipeline/actions/read_metadata_yaml.py
@staticmethod
def run(
    context: PipelineContext,
    *,
    path: str | None = None,
    file_name: str | None = None,
    table_name: str | None = None,
    **_: Any,
) -> PipelineContext:
    """Reads schema metadata from a yaml file using the [`Schema`][cloe_nessy.models.schema] model.

    Args:
        context: The context in which this Action is executed.
        path: The path to the data contract directory.
        file_name: The name of the file that defines the schema.
        table_name: The name of the table for which to retrieve metadata.

    Raises:
        ValueError: If any issues occur while reading the schema, such as an invalid schema,
            missing file, or missing path.

    Returns:
        The context after the execution of this Action, containing the table metadata.
    """
    if not path:
        raise ValueError("No path provided. Please specify path to schema metadata.")
    if not file_name:
        raise ValueError("No file_name provided. Please specify file name.")
    if not table_name:
        raise ValueError("No table_name provided. Please specify table name.")

    path_obj = pathlib.Path(path)

    schema, errors = Schema.read_instance_from_file(path_obj / file_name)
    if errors:
        raise ValueError(f"Errors while reading schema metadata: {errors}")
    if not schema:
        raise ValueError("No schema found in metadata.")

    table = schema.get_table_by_name(table_name=table_name)

    return context.from_existing(table_metadata=table)

TransformChangeDatatypeAction

Bases: PipelineAction

Changes the datatypes of specified columns in the given DataFrame.

Data Types

We make use of the PySpark cast function to change the data types of the columns. Valid data types can be found in the PySpark documentation.

Example
Cast Columns:
    action: TRANSFORM_CHANGE_DATATYPE
    options:
        columns:
            id: string
            revenue: long
Source code in src/cloe_nessy/pipeline/actions/transform_change_datatype.py
class TransformChangeDatatypeAction(PipelineAction):
    """Changes the datatypes of specified columns in the given DataFrame.

    !!! note "Data Types"
        We make use of the PySpark `cast` function to change the data types of
        the columns. Valid data types can be found in the [PySpark
        documentation](https://spark.apache.org/docs/3.5.3/sql-ref-datatypes.html).

    Example:
        ```yaml
        Cast Columns:
            action: TRANSFORM_CHANGE_DATATYPE
            options:
                columns:
                    id: string
                    revenue: long
        ```
    """

    name: str = "TRANSFORM_CHANGE_DATATYPE"

    def run(
        self,
        context: PipelineContext,
        *,
        columns: dict[str, str] | None = None,
        **_: Any,  # define kwargs to match the base class signature
    ) -> PipelineContext:
        """Changes the datatypes of specified columns in the given DataFrame.

        Args:
            context: The context in which this Action is executed.
            columns: A dictionary where the key is the column
                name and the value is the desired datatype.

        Raises:
            ValueError: If no columns are provided.
            ValueError: If the data from context is None.

        Returns:
            The context after the execution of this Action, containing the DataFrame with updated column datatypes.
        """
        if not columns:
            raise ValueError("No columns provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data
        change_columns = {col: F.col(col).cast(dtype) for col, dtype in columns.items()}
        df = df.withColumns(change_columns)  # type: ignore

        return context.from_existing(data=df)  # type: ignore

run(context, *, columns=None, **_)

Changes the datatypes of specified columns in the given DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
columns dict[str, str] | None

A dictionary where the key is the column name and the value is the desired datatype.

None

Raises:

Type Description
ValueError

If no columns are provided.

ValueError

If the data from context is None.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with updated column datatypes.

Source code in src/cloe_nessy/pipeline/actions/transform_change_datatype.py
def run(
    self,
    context: PipelineContext,
    *,
    columns: dict[str, str] | None = None,
    **_: Any,  # define kwargs to match the base class signature
) -> PipelineContext:
    """Changes the datatypes of specified columns in the given DataFrame.

    Args:
        context: The context in which this Action is executed.
        columns: A dictionary where the key is the column
            name and the value is the desired datatype.

    Raises:
        ValueError: If no columns are provided.
        ValueError: If the data from context is None.

    Returns:
        The context after the execution of this Action, containing the DataFrame with updated column datatypes.
    """
    if not columns:
        raise ValueError("No columns provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data
    change_columns = {col: F.col(col).cast(dtype) for col, dtype in columns.items()}
    df = df.withColumns(change_columns)  # type: ignore

    return context.from_existing(data=df)  # type: ignore

TransformCleanColumnNamesAction

Bases: PipelineAction

Fixes column names in the DataFrame to be valid.

Removes invalid characters from the column names, including the fields of a struct and replaces a single leading underscore by a double underscore.

Invalid characters include
  • Any non-word character (anything other than letters, digits, and underscores).
  • A single leading underscore.
Example
Clean Column Names:
    action: TRANSFORM_CLEAN_COLUMN_NAMES
Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
class TransformCleanColumnNamesAction(PipelineAction):
    """Fixes column names in the DataFrame to be valid.

    Removes invalid characters from the column names, including the fields of a struct and
    replaces a single leading underscore by a double underscore.

    Invalid characters include:
        - Any non-word character (anything other than letters, digits, and underscores).
        - A single leading underscore.

    Example:
        ```yaml
        Clean Column Names:
            action: TRANSFORM_CLEAN_COLUMN_NAMES
        ```
    """

    name: str = "TRANSFORM_CLEAN_COLUMN_NAMES"

    def run(
        self,
        context: PipelineContext,
        **_: Any,
    ) -> PipelineContext:
        """Fixes column names in the DataFrame to be valid.

        Removes invalid characters from the column names, including the fields of a struct and
        replaces a single leading underscore by a double underscore.

        Args:
            context: The context in which this Action is executed.

        Raises:
            ValueError: If the data from the context is None.

        Returns:
            The context after the execution of this Action, containing the DataFrame with cleaned column names.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        with_columns_renamed = {}
        with_columns_casted: dict[str, T.StructType | T.ArrayType | T.MapType] = {}

        single_underscrore_at_beginning = r"^_(?=[^_])"

        for c in context.data.schema:
            old_name = c.name
            new_name = re.sub(single_underscrore_at_beginning, "__", re.sub(r"\W", "_", old_name))
            with_columns_renamed[old_name] = new_name

            if isinstance(c.dataType, (T.StructType | T.ArrayType | T.MapType)):
                old_column_schema = c.dataType.json()
                new_column_schema = re.sub(
                    r'(?<="name":")[^"]+',
                    lambda m: re.sub(r"\W", "_", str(m.group())),
                    old_column_schema,
                )
                if isinstance(c.dataType, T.StructType):
                    with_columns_casted[new_name] = T.StructType.fromJson(json.loads(new_column_schema))
                elif isinstance(c.dataType, T.ArrayType):
                    with_columns_casted[new_name] = T.ArrayType.fromJson(json.loads(new_column_schema))
                elif isinstance(c.dataType, T.MapType):
                    with_columns_casted[new_name] = T.MapType.fromJson(json.loads(new_column_schema))

        df = context.data.withColumnsRenamed(with_columns_renamed)
        for c_name, c_type in with_columns_casted.items():
            df = df.withColumn(c_name, F.col(c_name).cast(c_type))

        return context.from_existing(data=df)  # type: ignore

run(context, **_)

Fixes column names in the DataFrame to be valid.

Removes invalid characters from the column names, including the fields of a struct and replaces a single leading underscore by a double underscore.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required

Raises:

Type Description
ValueError

If the data from the context is None.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with cleaned column names.

Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
def run(
    self,
    context: PipelineContext,
    **_: Any,
) -> PipelineContext:
    """Fixes column names in the DataFrame to be valid.

    Removes invalid characters from the column names, including the fields of a struct and
    replaces a single leading underscore by a double underscore.

    Args:
        context: The context in which this Action is executed.

    Raises:
        ValueError: If the data from the context is None.

    Returns:
        The context after the execution of this Action, containing the DataFrame with cleaned column names.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    with_columns_renamed = {}
    with_columns_casted: dict[str, T.StructType | T.ArrayType | T.MapType] = {}

    single_underscrore_at_beginning = r"^_(?=[^_])"

    for c in context.data.schema:
        old_name = c.name
        new_name = re.sub(single_underscrore_at_beginning, "__", re.sub(r"\W", "_", old_name))
        with_columns_renamed[old_name] = new_name

        if isinstance(c.dataType, (T.StructType | T.ArrayType | T.MapType)):
            old_column_schema = c.dataType.json()
            new_column_schema = re.sub(
                r'(?<="name":")[^"]+',
                lambda m: re.sub(r"\W", "_", str(m.group())),
                old_column_schema,
            )
            if isinstance(c.dataType, T.StructType):
                with_columns_casted[new_name] = T.StructType.fromJson(json.loads(new_column_schema))
            elif isinstance(c.dataType, T.ArrayType):
                with_columns_casted[new_name] = T.ArrayType.fromJson(json.loads(new_column_schema))
            elif isinstance(c.dataType, T.MapType):
                with_columns_casted[new_name] = T.MapType.fromJson(json.loads(new_column_schema))

    df = context.data.withColumnsRenamed(with_columns_renamed)
    for c_name, c_type in with_columns_casted.items():
        df = df.withColumn(c_name, F.col(c_name).cast(c_type))

    return context.from_existing(data=df)  # type: ignore

TransformConcatColumnsAction

Bases: PipelineAction

Concatenates the specified columns in the given DataFrame.

Example
Concat Columns:
    action: TRANSFORM_CONCAT_COLUMNS
    options:
        name: address
        columns:
            - street
            - postcode
            - country
        separator: ', '
Concat Column:
    action: TRANSFORM_CONCAT_COLUMNS
    options:
        name: address
        columns:
            - street
            - postcode
            - country

beware of null handling

The separator option is not provided, so the default behavior is to use concat which returns NULL if any of the concatenated values is NULL.

Source code in src/cloe_nessy/pipeline/actions/transform_concat_columns.py
class TransformConcatColumnsAction(PipelineAction):
    """Concatenates the specified columns in the given DataFrame.

    Example:
        === "concat with separator"
            ```yaml
            Concat Columns:
                action: TRANSFORM_CONCAT_COLUMNS
                options:
                    name: address
                    columns:
                        - street
                        - postcode
                        - country
                    separator: ', '
            ```
        === "concat without separator"
            ```yaml
            Concat Column:
                action: TRANSFORM_CONCAT_COLUMNS
                options:
                    name: address
                    columns:
                        - street
                        - postcode
                        - country
            ```
            !!! warning "beware of null handling"
                The `separator` option is not provided, so the default behavior is to use `concat` which returns `NULL` if any of the concatenated values is `NULL`.
    """

    name: str = "TRANSFORM_CONCAT_COLUMNS"

    def run(
        self,
        context: PipelineContext,
        *,
        name: str = "",
        columns: list[str] | None = None,
        separator: str | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Concatenates the specified columns in the given DataFrame.

        !!!warning

            # Null Handling Behavior

            The behavior of null handling differs based on whether a `separator` is provided:

            - **When `separator` is specified**: The function uses Spark's
                `concat_ws`, which **ignores `NULL` values**. In this case, `NULL`
                values are treated as empty strings (`""`) and are excluded from the
                final concatenated result.
            - **When `separator` is not specified**: The function defaults to
                using Spark's `concat`, which **returns `NULL` if any of the
                concatenated values is `NULL`**. This means the presence of a `NULL`
                in any input will make the entire output `NULL`.

        Args:
            context: The context in which this Action is executed.
            name: The name of the new concatenated column.
            columns: A list of columns to be concatenated.
            separator: The separator used between concatenated column values.

        Raises:
            ValueError: If no name is provided.
            ValueError: If no columns are provided.
            ValueError: If the data from context is None.
            ValueError: If 'columns' is not a list.

        Returns:
            The context after the execution of this Action, containing the
                DataFrame with the concatenated column.
        """
        if not name:
            raise ValueError("No name provided.")
        if not columns:
            raise ValueError("No columns provided.")

        if context.data is None:
            raise ValueError("The data from context is required for the operation.")

        df = context.data

        if isinstance(columns, list):
            if separator:
                df = df.withColumn(name, F.concat_ws(separator, *columns))  # type: ignore
            else:
                df = df.withColumn(name, F.concat(*columns))  # type: ignore
        else:
            raise ValueError("'columns' should be a list, like ['col1', 'col2',]")

        return context.from_existing(data=df)  # type: ignore

run(context, *, name='', columns=None, separator=None, **_)

Concatenates the specified columns in the given DataFrame.

Warning

Null Handling Behavior

The behavior of null handling differs based on whether a separator is provided:

  • When separator is specified: The function uses Spark's concat_ws, which ignores NULL values. In this case, NULL values are treated as empty strings ("") and are excluded from the final concatenated result.
  • When separator is not specified: The function defaults to using Spark's concat, which returns NULL if any of the concatenated values is NULL. This means the presence of a NULL in any input will make the entire output NULL.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
name str

The name of the new concatenated column.

''
columns list[str] | None

A list of columns to be concatenated.

None
separator str | None

The separator used between concatenated column values.

None

Raises:

Type Description
ValueError

If no name is provided.

ValueError

If no columns are provided.

ValueError

If the data from context is None.

ValueError

If 'columns' is not a list.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with the concatenated column.

Source code in src/cloe_nessy/pipeline/actions/transform_concat_columns.py
def run(
    self,
    context: PipelineContext,
    *,
    name: str = "",
    columns: list[str] | None = None,
    separator: str | None = None,
    **_: Any,
) -> PipelineContext:
    """Concatenates the specified columns in the given DataFrame.

    !!!warning

        # Null Handling Behavior

        The behavior of null handling differs based on whether a `separator` is provided:

        - **When `separator` is specified**: The function uses Spark's
            `concat_ws`, which **ignores `NULL` values**. In this case, `NULL`
            values are treated as empty strings (`""`) and are excluded from the
            final concatenated result.
        - **When `separator` is not specified**: The function defaults to
            using Spark's `concat`, which **returns `NULL` if any of the
            concatenated values is `NULL`**. This means the presence of a `NULL`
            in any input will make the entire output `NULL`.

    Args:
        context: The context in which this Action is executed.
        name: The name of the new concatenated column.
        columns: A list of columns to be concatenated.
        separator: The separator used between concatenated column values.

    Raises:
        ValueError: If no name is provided.
        ValueError: If no columns are provided.
        ValueError: If the data from context is None.
        ValueError: If 'columns' is not a list.

    Returns:
        The context after the execution of this Action, containing the
            DataFrame with the concatenated column.
    """
    if not name:
        raise ValueError("No name provided.")
    if not columns:
        raise ValueError("No columns provided.")

    if context.data is None:
        raise ValueError("The data from context is required for the operation.")

    df = context.data

    if isinstance(columns, list):
        if separator:
            df = df.withColumn(name, F.concat_ws(separator, *columns))  # type: ignore
        else:
            df = df.withColumn(name, F.concat(*columns))  # type: ignore
    else:
        raise ValueError("'columns' should be a list, like ['col1', 'col2',]")

    return context.from_existing(data=df)  # type: ignore

TransformDecodeAction

Bases: PipelineAction

Decodes values of a specified column in the DataFrame based on the given format.

Example
Expand JSON:
    action: "TRANSFORM_DECODE"
    options:
        column: "data"
        input_format: "json"
        schema: "quality INT, timestamp TIMESTAMP, value DOUBLE"
Decode base64:
    action: TRANSFORM_DECODE
    options:
        column: encoded_data
        input_format: base64
        schema: string
Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
class TransformDecodeAction(PipelineAction):
    """Decodes values of a specified column in the DataFrame based on the given format.

    Example:
        === "Decode JSON column"
            ```yaml
            Expand JSON:
                action: "TRANSFORM_DECODE"
                options:
                    column: "data"
                    input_format: "json"
                    schema: "quality INT, timestamp TIMESTAMP, value DOUBLE"
            ```
        === "Decode base64 column"
            ```yaml
            Decode base64:
                action: TRANSFORM_DECODE
                options:
                    column: encoded_data
                    input_format: base64
                    schema: string
            ```
    """

    name: str = "TRANSFORM_DECODE"

    def run(
        self,
        context: PipelineContext,
        *,
        column: str | None = None,
        input_format: str | None = None,
        schema: str | None = None,
        **_: Any,  # define kwargs to match the base class signature
    ) -> PipelineContext:
        """Decodes values of a specified column in the DataFrame based on the given format.

        Args:
            context: The context in which this Action is executed.
            column: The name of the column that should be decoded.
            input_format: The format from which the column should be decoded.
                Currently supported formats are 'base64' and 'json'.
            schema: For JSON input, the schema of the JSON object. If empty,
                the schema is inferred from the first row of the DataFrame. For base64 input,
                the data type to which the column is cast.

        Raises:
            ValueError: If no column is specified.
            ValueError: If no input_format is specified.
            ValueError: If the data from context is None.
            ValueError: If an invalid input_format is provided.

        Returns:
            The context after the execution of this Action, containing the DataFrame with the decoded column(s).
        """
        if not column:
            raise ValueError("No column specified.")
        if not input_format:
            raise ValueError("No input_format specified")
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data
        match input_format.lower():
            case "base64":
                df = self._decode_base64(df, column, schema)  # type: ignore
            case "json":
                df = self._decode_json(df, column, schema)  # type: ignore
            case _:
                raise ValueError(
                    f"Invalid input_format: [ '{input_format}' ]. Please specify a valid format to decode.",
                )

        return context.from_existing(data=df)  # type: ignore

    def _decode_base64(self, df: DataFrame, column: str, base64_schema: str | None):
        """Decode base64 column."""
        df_decoded = df.withColumn(column, unbase64(col(column)))
        if base64_schema:
            df_decoded = df_decoded.withColumn(column, col(column).cast(base64_schema))
        return df_decoded

    def _decode_json(self, df: DataFrame, column: str, json_schema: str | None):
        """Decode json column."""
        distinct_schemas = (
            df.select(column)
            .withColumn("json_schema", schema_of_json(col(column)))
            .select("json_schema")
            .dropDuplicates()
        )
        if not (json_schema or distinct_schemas.count() > 0):
            raise RuntimeError("Cannot infer schema from empty DataFrame.")

        elif distinct_schemas.count() > 1:
            raise RuntimeError(f"There is more than one JSON schema in column {column}.")

        if json_schema is None:
            final_json_schema = distinct_schemas.collect()[0].json_schema
        else:
            final_json_schema = json_schema  # type: ignore

        df_decoded = df.withColumn(column, from_json(col(column), final_json_schema)).select(*df.columns, f"{column}.*")

        return df_decoded

_decode_base64(df, column, base64_schema)

Decode base64 column.

Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
def _decode_base64(self, df: DataFrame, column: str, base64_schema: str | None):
    """Decode base64 column."""
    df_decoded = df.withColumn(column, unbase64(col(column)))
    if base64_schema:
        df_decoded = df_decoded.withColumn(column, col(column).cast(base64_schema))
    return df_decoded

_decode_json(df, column, json_schema)

Decode json column.

Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
def _decode_json(self, df: DataFrame, column: str, json_schema: str | None):
    """Decode json column."""
    distinct_schemas = (
        df.select(column)
        .withColumn("json_schema", schema_of_json(col(column)))
        .select("json_schema")
        .dropDuplicates()
    )
    if not (json_schema or distinct_schemas.count() > 0):
        raise RuntimeError("Cannot infer schema from empty DataFrame.")

    elif distinct_schemas.count() > 1:
        raise RuntimeError(f"There is more than one JSON schema in column {column}.")

    if json_schema is None:
        final_json_schema = distinct_schemas.collect()[0].json_schema
    else:
        final_json_schema = json_schema  # type: ignore

    df_decoded = df.withColumn(column, from_json(col(column), final_json_schema)).select(*df.columns, f"{column}.*")

    return df_decoded

run(context, *, column=None, input_format=None, schema=None, **_)

Decodes values of a specified column in the DataFrame based on the given format.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
column str | None

The name of the column that should be decoded.

None
input_format str | None

The format from which the column should be decoded. Currently supported formats are 'base64' and 'json'.

None
schema str | None

For JSON input, the schema of the JSON object. If empty, the schema is inferred from the first row of the DataFrame. For base64 input, the data type to which the column is cast.

None

Raises:

Type Description
ValueError

If no column is specified.

ValueError

If no input_format is specified.

ValueError

If the data from context is None.

ValueError

If an invalid input_format is provided.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with the decoded column(s).

Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
def run(
    self,
    context: PipelineContext,
    *,
    column: str | None = None,
    input_format: str | None = None,
    schema: str | None = None,
    **_: Any,  # define kwargs to match the base class signature
) -> PipelineContext:
    """Decodes values of a specified column in the DataFrame based on the given format.

    Args:
        context: The context in which this Action is executed.
        column: The name of the column that should be decoded.
        input_format: The format from which the column should be decoded.
            Currently supported formats are 'base64' and 'json'.
        schema: For JSON input, the schema of the JSON object. If empty,
            the schema is inferred from the first row of the DataFrame. For base64 input,
            the data type to which the column is cast.

    Raises:
        ValueError: If no column is specified.
        ValueError: If no input_format is specified.
        ValueError: If the data from context is None.
        ValueError: If an invalid input_format is provided.

    Returns:
        The context after the execution of this Action, containing the DataFrame with the decoded column(s).
    """
    if not column:
        raise ValueError("No column specified.")
    if not input_format:
        raise ValueError("No input_format specified")
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data
    match input_format.lower():
        case "base64":
            df = self._decode_base64(df, column, schema)  # type: ignore
        case "json":
            df = self._decode_json(df, column, schema)  # type: ignore
        case _:
            raise ValueError(
                f"Invalid input_format: [ '{input_format}' ]. Please specify a valid format to decode.",
            )

    return context.from_existing(data=df)  # type: ignore

TransformDistinctAction

Bases: PipelineAction

Selects distinct rows from the DataFrame in the given context.

If a subset is given these columns are used for duplicate comparison. If no subset is given all columns are used.

Example
Distinct Columns:
    action: TRANSFORM_DISTINCT
    options:
        subset:
            - first_name
            - last_name
Source code in src/cloe_nessy/pipeline/actions/transform_distinct.py
class TransformDistinctAction(PipelineAction):
    """Selects distinct rows from the DataFrame in the given context.

    If a subset is given these columns are used for duplicate comparison. If no subset is given all columns are used.

    Example:
        ```yaml
        Distinct Columns:
            action: TRANSFORM_DISTINCT
            options:
                subset:
                    - first_name
                    - last_name
        ```
    """

    name: str = "TRANSFORM_DISTINCT"

    def run(
        self,
        context: PipelineContext,
        *,
        subset: list[str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Selects distinct rows from the DataFrame in the given context.

        Args:
            context: The context in which this Action is executed.
            subset: List of column names to use for duplicate comparison (default All columns).

        Raises:
            ValueError: If the data from the context is None.

        Returns:
            The context after the execution of this Action, containing the DataFrame with distinct rows.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        # check if all columns that are part of the subset are actually part of the dataframe.
        if subset is not None:
            subset_columns_not_in_dataframe = set(subset) - set(context.data.columns)
            if len(subset_columns_not_in_dataframe) != 0:
                raise ValueError(
                    f"The following subset columns are not part of the dataframe: {subset_columns_not_in_dataframe}"
                )

        df = context.data.dropDuplicates(subset=subset)

        return context.from_existing(data=df)  # type: ignore

run(context, *, subset=None, **_)

Selects distinct rows from the DataFrame in the given context.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
subset list[str] | None

List of column names to use for duplicate comparison (default All columns).

None

Raises:

Type Description
ValueError

If the data from the context is None.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with distinct rows.

Source code in src/cloe_nessy/pipeline/actions/transform_distinct.py
def run(
    self,
    context: PipelineContext,
    *,
    subset: list[str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Selects distinct rows from the DataFrame in the given context.

    Args:
        context: The context in which this Action is executed.
        subset: List of column names to use for duplicate comparison (default All columns).

    Raises:
        ValueError: If the data from the context is None.

    Returns:
        The context after the execution of this Action, containing the DataFrame with distinct rows.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    # check if all columns that are part of the subset are actually part of the dataframe.
    if subset is not None:
        subset_columns_not_in_dataframe = set(subset) - set(context.data.columns)
        if len(subset_columns_not_in_dataframe) != 0:
            raise ValueError(
                f"The following subset columns are not part of the dataframe: {subset_columns_not_in_dataframe}"
            )

    df = context.data.dropDuplicates(subset=subset)

    return context.from_existing(data=df)  # type: ignore

TransformFilterAction

Bases: PipelineAction

Filters the DataFrame in the given context based on a specified condition.

Example
Filter Columns:
    action: TRANSFORM_FILTER
    options:
        condition: city="Hamburg"
Source code in src/cloe_nessy/pipeline/actions/transform_filter.py
class TransformFilterAction(PipelineAction):
    """Filters the DataFrame in the given context based on a specified condition.

    Example:
        ```yaml
        Filter Columns:
            action: TRANSFORM_FILTER
            options:
                condition: city="Hamburg"
        ```
    """

    name: str = "TRANSFORM_FILTER"

    def run(
        self,
        context: PipelineContext,
        *,
        condition: str = "",
        **_: Any,
    ) -> PipelineContext:
        """Filters the DataFrame in the given context based on a specified condition.

        Args:
            context: Context in which this Action is executed.
            condition: A SQL-like expression used to filter the DataFrame.

        Raises:
            ValueError: If no condition is provided.
            ValueError: If the data from the context is None.

        Returns:
            Context after the execution of this Action, containing the filtered DataFrame.
        """
        if not condition:
            raise ValueError("No condition provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data

        df_filtered = df.filter(condition=condition)

        return context.from_existing(data=df_filtered)  # type: ignore

run(context, *, condition='', **_)

Filters the DataFrame in the given context based on a specified condition.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
condition str

A SQL-like expression used to filter the DataFrame.

''

Raises:

Type Description
ValueError

If no condition is provided.

ValueError

If the data from the context is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action, containing the filtered DataFrame.

Source code in src/cloe_nessy/pipeline/actions/transform_filter.py
def run(
    self,
    context: PipelineContext,
    *,
    condition: str = "",
    **_: Any,
) -> PipelineContext:
    """Filters the DataFrame in the given context based on a specified condition.

    Args:
        context: Context in which this Action is executed.
        condition: A SQL-like expression used to filter the DataFrame.

    Raises:
        ValueError: If no condition is provided.
        ValueError: If the data from the context is None.

    Returns:
        Context after the execution of this Action, containing the filtered DataFrame.
    """
    if not condition:
        raise ValueError("No condition provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data

    df_filtered = df.filter(condition=condition)

    return context.from_existing(data=df_filtered)  # type: ignore

TransformGroupAggregate

Bases: PipelineAction

Performs aggregation operations on grouped data within a DataFrame.

This class allows you to group data by specified columns and apply various aggregation functions to other columns. The aggregation functions can be specified as a dictionary where keys are column names and values are either a single aggregation function or a list of functions.

The output DataFrame will contain the grouped columns and the aggregated columns with the aggregation function as a prefix to the column name.

Example
Transform Group Aggregate:
    action: TRANSFORM_GROUP_AGGREGATE
    options:
        grouping_columns:
            - column1
            - column2
        aggregations:
            column3:
                - sum
                - avg
            column4: max

This example groups the DataFrame by column1 and column2 and aggregates column3 by sum and average and column4 by max. The resulting DataFrame will contain the grouped columns column1 and column2 and the aggregated columns sum_column3, avg_column3, and max_column4.

Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
class TransformGroupAggregate(PipelineAction):
    """Performs aggregation operations on grouped data within a DataFrame.

    This class allows you to group data by specified columns and apply various aggregation functions
    to other columns. The aggregation functions can be specified as a dictionary where keys are column names
    and values are either a single aggregation function or a list of functions.

    The output DataFrame will contain the grouped columns and the aggregated columns with the aggregation
    function as a prefix to the column name.

    Example:
        ```yaml
        Transform Group Aggregate:
            action: TRANSFORM_GROUP_AGGREGATE
            options:
                grouping_columns:
                    - column1
                    - column2
                aggregations:
                    column3:
                        - sum
                        - avg
                    column4: max
        ```

        This example groups the DataFrame by `column1` and `column2` and aggregates `column3` by sum and average
        and `column4` by max. The resulting DataFrame will contain the grouped columns `column1` and `column2`
        and the aggregated columns `sum_column3`, `avg_column3`, and `max_column4`.
    """

    name: str = "TRANSFORM_GROUP_AGGREGATE"

    def run(
        self,
        context: PipelineContext,
        *,
        grouping_columns: list[str] | None = None,
        aggregations: dict[str, str | list] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Executes the aggregation on the grouped data.

        Args:
            context: The context in which this action is executed.
            grouping_columns: A list of columns to group by.
            aggregations: A dictionary where keys are column names and values are either a single
                aggregation function or a list of functions.

        Raises:
            ValueError: If the context data is None.
            ValueError: If no aggregations are provided.
            ValueError: If invalid aggregation operations are provided.
            ValueError: If columns with unsupported data types are included in the aggregations.

        Returns:
            PipelineContext: The context after the execution of this action.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        if grouping_columns is None:
            raise ValueError("Please provide at least one grouping column")
        if aggregations is None:
            raise ValueError("Please provide aggregations.")

        valid_operations = ["avg", "max", "min", "mean", "sum", "count"]

        for operation in aggregations.values():
            if isinstance(operation, list):
                if not set(operation).issubset(valid_operations):
                    raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
            elif isinstance(operation, str):
                if operation not in valid_operations:
                    raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
            else:
                raise ValueError("OPERATION DATATYPE INVALID")

        aggregation_list = []
        for column_name, aggregation in aggregations.items():
            if isinstance(aggregation, list):
                for subaggregation in aggregation:
                    aggregation_list.append(
                        getattr(F, subaggregation)(column_name).alias(f"{subaggregation}_{column_name}")
                    )
            else:
                aggregation_list.append(getattr(F, aggregation)(column_name).alias(f"{aggregation}_{column_name}"))

        df = context.data.groupBy(grouping_columns).agg(*aggregation_list)

        return context.from_existing(data=df)

run(context, *, grouping_columns=None, aggregations=None, **_)

Executes the aggregation on the grouped data.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this action is executed.

required
grouping_columns list[str] | None

A list of columns to group by.

None
aggregations dict[str, str | list] | None

A dictionary where keys are column names and values are either a single aggregation function or a list of functions.

None

Raises:

Type Description
ValueError

If the context data is None.

ValueError

If no aggregations are provided.

ValueError

If invalid aggregation operations are provided.

ValueError

If columns with unsupported data types are included in the aggregations.

Returns:

Name Type Description
PipelineContext PipelineContext

The context after the execution of this action.

Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
def run(
    self,
    context: PipelineContext,
    *,
    grouping_columns: list[str] | None = None,
    aggregations: dict[str, str | list] | None = None,
    **_: Any,
) -> PipelineContext:
    """Executes the aggregation on the grouped data.

    Args:
        context: The context in which this action is executed.
        grouping_columns: A list of columns to group by.
        aggregations: A dictionary where keys are column names and values are either a single
            aggregation function or a list of functions.

    Raises:
        ValueError: If the context data is None.
        ValueError: If no aggregations are provided.
        ValueError: If invalid aggregation operations are provided.
        ValueError: If columns with unsupported data types are included in the aggregations.

    Returns:
        PipelineContext: The context after the execution of this action.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    if grouping_columns is None:
        raise ValueError("Please provide at least one grouping column")
    if aggregations is None:
        raise ValueError("Please provide aggregations.")

    valid_operations = ["avg", "max", "min", "mean", "sum", "count"]

    for operation in aggregations.values():
        if isinstance(operation, list):
            if not set(operation).issubset(valid_operations):
                raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
        elif isinstance(operation, str):
            if operation not in valid_operations:
                raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
        else:
            raise ValueError("OPERATION DATATYPE INVALID")

    aggregation_list = []
    for column_name, aggregation in aggregations.items():
        if isinstance(aggregation, list):
            for subaggregation in aggregation:
                aggregation_list.append(
                    getattr(F, subaggregation)(column_name).alias(f"{subaggregation}_{column_name}")
                )
        else:
            aggregation_list.append(getattr(F, aggregation)(column_name).alias(f"{aggregation}_{column_name}"))

    df = context.data.groupBy(grouping_columns).agg(*aggregation_list)

    return context.from_existing(data=df)

TransformHashColumnsAction

Bases: PipelineAction

Hashes specified columns in a DataFrame using a chosen algorithm.

Given the following hash_config:

Example
Hash Columns:
    action: TRANSFORM_HASH_COLUMNS
    options:
        hash_config:
            hashed_column1:
                columns: ["column1", "column2"]
                algorithm: "sha2"
                bits: 224
            hashed_column2:
                columns: ["column1"]
                algorithm: "crc32"

Given a DataFrame df with the following structure:

column1 column2 column3
foo bar baz

After running the action, the resulting DataFrame will look like:

column1 column2 column3 hashed_column1 hashed_column2
foo bar baz 17725b837e9c896e7123b142eb980131dcc0baa6160db45d4adfdb21 1670361220

Hash values might vary

The actual hash values will depend on the hashing algorithm used and the input data.

Source code in src/cloe_nessy/pipeline/actions/transform_hash_columns.py
class TransformHashColumnsAction(PipelineAction):
    """Hashes specified columns in a DataFrame using a chosen algorithm.

    Given the following `hash_config`:

    Example:
        ```yaml
        Hash Columns:
            action: TRANSFORM_HASH_COLUMNS
            options:
                hash_config:
                    hashed_column1:
                        columns: ["column1", "column2"]
                        algorithm: "sha2"
                        bits: 224
                    hashed_column2:
                        columns: ["column1"]
                        algorithm: "crc32"
        ```

    Given a DataFrame `df` with the following structure:

    | column1 | column2 | column3 |
    |---------|---------|---------|
    |   foo   |   bar   |   baz   |

    After running the action, the resulting DataFrame will look like:

    | column1 | column2 | column3 |                 hashed_column1                            | hashed_column2 |
    |---------|---------|---------|-----------------------------------------------------------|----------------|
    |   foo   |   bar   |   baz   |  17725b837e9c896e7123b142eb980131dcc0baa6160db45d4adfdb21 |  1670361220    |


    !!! note "Hash values might vary"
        The actual hash values will depend on the hashing algorithm used and the input data.
    """

    name: str = "TRANSFORM_HASH_COLUMNS"

    def run(
        self,
        context: PipelineContext,
        *,
        hash_config: HashConfig | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Hashes the specified columns in the DataFrame.

        Args:
            context: Context in which this Action is executed.
            hash_config: Dictionary that contains the configuration for executing the hashing.

        Returns:
            Updated PipelineContext with hashed columns.

        Raises:
            ValueError: If columns are missing, data is None, or algorithm/bits are invalid.
            ValueError: If the hash configuration is invalid.
        """
        if context.data is None:
            raise ValueError("Context data is required for hashing.")

        if not hash_config:
            raise ValueError("Hash configuration is required.")

        df = context.data

        hash_functions = {
            "hash": lambda cols: F.hash(*[F.col(c) for c in cols]).cast("string"),
            "xxhash64": lambda cols: F.xxhash64(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
            "md5": lambda cols: F.md5(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
            "sha1": lambda cols: F.sha1(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
            "sha2": lambda cols, bits: F.sha2(F.concat_ws("||", *[F.col(c) for c in cols]), bits).cast("string"),
            "crc32": lambda cols: F.crc32(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
        }
        default_sha2_bits = 256

        config_obj = HashConfig.model_validate({"hash_config": hash_config})
        for new_col, config in config_obj.hash_config.items():
            hash_func = hash_functions[config.algorithm]
            if config.algorithm == "sha2":
                df = df.withColumn(new_col, hash_func(config.columns, config.bits or default_sha2_bits))  # type: ignore
            else:
                df = df.withColumn(new_col, hash_func(config.columns))  # type: ignore

        return context.from_existing(data=df)

run(context, *, hash_config=None, **_)

Hashes the specified columns in the DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
hash_config HashConfig | None

Dictionary that contains the configuration for executing the hashing.

None

Returns:

Type Description
PipelineContext

Updated PipelineContext with hashed columns.

Raises:

Type Description
ValueError

If columns are missing, data is None, or algorithm/bits are invalid.

ValueError

If the hash configuration is invalid.

Source code in src/cloe_nessy/pipeline/actions/transform_hash_columns.py
def run(
    self,
    context: PipelineContext,
    *,
    hash_config: HashConfig | None = None,
    **_: Any,
) -> PipelineContext:
    """Hashes the specified columns in the DataFrame.

    Args:
        context: Context in which this Action is executed.
        hash_config: Dictionary that contains the configuration for executing the hashing.

    Returns:
        Updated PipelineContext with hashed columns.

    Raises:
        ValueError: If columns are missing, data is None, or algorithm/bits are invalid.
        ValueError: If the hash configuration is invalid.
    """
    if context.data is None:
        raise ValueError("Context data is required for hashing.")

    if not hash_config:
        raise ValueError("Hash configuration is required.")

    df = context.data

    hash_functions = {
        "hash": lambda cols: F.hash(*[F.col(c) for c in cols]).cast("string"),
        "xxhash64": lambda cols: F.xxhash64(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
        "md5": lambda cols: F.md5(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
        "sha1": lambda cols: F.sha1(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
        "sha2": lambda cols, bits: F.sha2(F.concat_ws("||", *[F.col(c) for c in cols]), bits).cast("string"),
        "crc32": lambda cols: F.crc32(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
    }
    default_sha2_bits = 256

    config_obj = HashConfig.model_validate({"hash_config": hash_config})
    for new_col, config in config_obj.hash_config.items():
        hash_func = hash_functions[config.algorithm]
        if config.algorithm == "sha2":
            df = df.withColumn(new_col, hash_func(config.columns, config.bits or default_sha2_bits))  # type: ignore
        else:
            df = df.withColumn(new_col, hash_func(config.columns))  # type: ignore

    return context.from_existing(data=df)

TransformJoinAction

Bases: PipelineAction

Joins the current DataFrame with another DataFrame defined in joined_data.

The join operation is performed based on specified columns and the type of join indicated by the how parameter. Supported join types can be taken from PySpark documentation

Example
Join Tables:
    action: TRANSFORM_JOIN
    options:
        joined_data: ((step:Transform First Table))
        join_on: id
        how: anti

Referencing a DataFrame from another step

The joined_data parameter is a reference to the DataFrame from another step. The DataFrame is accessed using the result attribute of the PipelineStep. The syntax for referencing the DataFrame is ((step:Step Name)), mind the double parentheses.

Source code in src/cloe_nessy/pipeline/actions/transform_join.py
class TransformJoinAction(PipelineAction):
    """Joins the current DataFrame with another DataFrame defined in joined_data.

    The join operation is performed based on specified columns and the type of
    join indicated by the `how` parameter. Supported join types can be taken
    from [PySpark
    documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html)

    Example:
        ```yaml
        Join Tables:
            action: TRANSFORM_JOIN
            options:
                joined_data: ((step:Transform First Table))
                join_on: id
                how: anti
        ```

        !!! note "Referencing a DataFrame from another step"
            The `joined_data` parameter is a reference to the DataFrame from another step.
            The DataFrame is accessed using the `result` attribute of the PipelineStep. The syntax
            for referencing the DataFrame is `((step:Step Name))`, mind the double parentheses.
    """

    name: str = "TRANSFORM_JOIN"

    def run(
        self,
        context: PipelineContext,
        *,
        joined_data: PipelineStep | None = None,
        join_on: list[str] | str | dict[str, str] | None = None,
        how: str = "inner",
        **_: Any,
    ) -> PipelineContext:
        """Joins the current DataFrame with another DataFrame defined in joined_data.

        Args:
            context: Context in which this Action is executed.
            joined_data: The PipelineStep context defining the DataFrame
                to join with as the right side of the join.
            join_on: A string for the join column
                name, a list of column names, or a dictionary mapping columns from the
                left DataFrame to the right DataFrame. This defines the condition for the
                join operation.
            how: The type of join to perform. Must be one of: inner, cross, outer,
                full, fullouter, left, leftouter, right, rightouter, semi, anti, etc.

        Raises:
            ValueError: If no joined_data is provided.
            ValueError: If no join_on is provided.
            ValueError: If the data from context is None.
            ValueError: If the data from the joined_data is None.

        Returns:
            Context after the execution of this Action, containing the result of the join operation.
        """
        if joined_data is None or joined_data.result is None or joined_data.result.data is None:
            raise ValueError("No joined_data provided.")
        if not join_on:
            raise ValueError("No join_on provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df_right = joined_data.result.data.alias("right")  # type: ignore
        df_left = context.data.alias("left")  # type: ignore

        if isinstance(join_on, str):
            join_condition = [join_on]
        elif isinstance(join_on, list):
            join_condition = join_on
        else:
            join_condition = [
                df_left[left_column] == df_right[right_column]  # type: ignore
                for left_column, right_column in join_on.items()
            ]

        df = df_left.join(df_right, on=join_condition, how=how)  # type: ignore

        return context.from_existing(data=df)  # type: ignore

run(context, *, joined_data=None, join_on=None, how='inner', **_)

Joins the current DataFrame with another DataFrame defined in joined_data.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
joined_data PipelineStep | None

The PipelineStep context defining the DataFrame to join with as the right side of the join.

None
join_on list[str] | str | dict[str, str] | None

A string for the join column name, a list of column names, or a dictionary mapping columns from the left DataFrame to the right DataFrame. This defines the condition for the join operation.

None
how str

The type of join to perform. Must be one of: inner, cross, outer, full, fullouter, left, leftouter, right, rightouter, semi, anti, etc.

'inner'

Raises:

Type Description
ValueError

If no joined_data is provided.

ValueError

If no join_on is provided.

ValueError

If the data from context is None.

ValueError

If the data from the joined_data is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action, containing the result of the join operation.

Source code in src/cloe_nessy/pipeline/actions/transform_join.py
def run(
    self,
    context: PipelineContext,
    *,
    joined_data: PipelineStep | None = None,
    join_on: list[str] | str | dict[str, str] | None = None,
    how: str = "inner",
    **_: Any,
) -> PipelineContext:
    """Joins the current DataFrame with another DataFrame defined in joined_data.

    Args:
        context: Context in which this Action is executed.
        joined_data: The PipelineStep context defining the DataFrame
            to join with as the right side of the join.
        join_on: A string for the join column
            name, a list of column names, or a dictionary mapping columns from the
            left DataFrame to the right DataFrame. This defines the condition for the
            join operation.
        how: The type of join to perform. Must be one of: inner, cross, outer,
            full, fullouter, left, leftouter, right, rightouter, semi, anti, etc.

    Raises:
        ValueError: If no joined_data is provided.
        ValueError: If no join_on is provided.
        ValueError: If the data from context is None.
        ValueError: If the data from the joined_data is None.

    Returns:
        Context after the execution of this Action, containing the result of the join operation.
    """
    if joined_data is None or joined_data.result is None or joined_data.result.data is None:
        raise ValueError("No joined_data provided.")
    if not join_on:
        raise ValueError("No join_on provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df_right = joined_data.result.data.alias("right")  # type: ignore
    df_left = context.data.alias("left")  # type: ignore

    if isinstance(join_on, str):
        join_condition = [join_on]
    elif isinstance(join_on, list):
        join_condition = join_on
    else:
        join_condition = [
            df_left[left_column] == df_right[right_column]  # type: ignore
            for left_column, right_column in join_on.items()
        ]

    df = df_left.join(df_right, on=join_condition, how=how)  # type: ignore

    return context.from_existing(data=df)  # type: ignore

TransformJsonNormalize

Bases: PipelineAction

Normalizes and flattens the DataFrame by exploding array columns and flattening struct columns.

The method performs recursive normalization on the DataFrame present in the context, ensuring that the order of columns is retained and new columns created by flattening structs are appended after existing columns.

Example

Normalize Tables:
    action: TRANSFORM_JSON_NORMALIZE
    options:
        exclude_columns: coordinates
Example Input Data:

id name coordinates attributes
1 Alice [10.0, 20.0] {"age": 30, "city": "NY"}
2 Bob [30.0, 40.0] {"age": 25, "city": "LA"}

Example Output Data:

id name coordinates attributes_age attributes_city
1 Alice [10.0, 20.0] 30 NY
2 Bob [30.0, 40.0] 25 LA
Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
class TransformJsonNormalize(PipelineAction):
    """Normalizes and flattens the DataFrame by exploding array columns and flattening struct columns.

    The method performs recursive normalization on the DataFrame present in the context,
    ensuring that the order of columns is retained and new columns created by flattening
    structs are appended after existing columns.

    Example:
        ```yaml
        Normalize Tables:
            action: TRANSFORM_JSON_NORMALIZE
            options:
                exclude_columns: coordinates
        ```
        Example Input Data:

        | id | name   | coordinates          | attributes                |
        |----|--------|----------------------|---------------------------|
        | 1  | Alice  | [10.0, 20.0]         | {"age": 30, "city": "NY"} |
        | 2  | Bob    | [30.0, 40.0]         | {"age": 25, "city": "LA"} |

        Example Output Data:

        | id | name   | coordinates | attributes_age | attributes_city |
        |----|--------|-------------|----------------|-----------------|
        | 1  | Alice  | [10.0, 20.0]| 30             | NY              |
        | 2  | Bob    | [30.0, 40.0]| 25             | LA              |
    """

    name: str = "TRANSFORM_JSON_NORMALIZE"

    def run(
        self,
        context: PipelineContext,
        *,
        exclude_columns: list[str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Executes the normalization process on the DataFrame present in the context.

        Please note that columns retain their relative order during the
        normalization process, and new columns created by flattening structs are
        appended after the existing columns.

        Args:
            context: The pipeline context that contains the DataFrame to be normalized.
            exclude_columns: A list of column names to exclude from the normalization process.
                    These columns will not be exploded or flattened.
            **_: Additional keyword arguments (not used).

        Returns:
            A new pipeline context with the normalized DataFrame.

        Raises:
            ValueError: If the DataFrame in the context is `None`.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        if not exclude_columns:
            exclude_columns = []
        df = TransformJsonNormalize._normalize(context.data, exclude_columns=cast(list, exclude_columns))
        return context.from_existing(data=df)

    @staticmethod
    def _normalize(df, exclude_columns):
        """Recursively normalizes the given DataFrame by exploding arrays and flattening structs.

        This method performs two primary operations:
        1. Explodes any array columns, unless they are in the list of excluded columns.
        2. Flattens any struct columns, renaming nested fields and appending them to the top-level DataFrame.

        The method continues these operations in a loop until there are no array or struct columns left.

        Args:
            df: The input DataFrame to normalize.
            exclude_columns: A list of column names to exclude from the normalization process. These columns
                                         will not be exploded or flattened.

        Returns:
            pyspark.sql.DataFrame: The normalized DataFrame with no array or struct columns.
        """

        def explode_arrays(df, exclude_columns):
            array_present = False
            for col in df.columns:
                if df.schema[col].dataType.typeName() == "array" and col not in exclude_columns:
                    df = df.withColumn(col, F.explode(col))
                    array_present = True
            return df, array_present

        def flatten_structs(df):
            struct_present = False
            struct_columns = [col for col in df.columns if df.schema[col].dataType.typeName() == "struct"]
            for col in struct_columns:
                df = df.select(F.col("*"), F.col(col + ".*"))
                nested_columns = df.select(F.col(col + ".*")).schema.names
                for nested_col in nested_columns:
                    df = df.withColumnRenamed(nested_col, f"{col}_{nested_col}")
                df = df.drop(col)
                struct_present = True
            return df, struct_present

        array_present = True
        struct_present = True

        while array_present or struct_present:
            df, array_present = explode_arrays(df, exclude_columns)
            df, struct_present = flatten_structs(df)

        return df

_normalize(df, exclude_columns) staticmethod

Recursively normalizes the given DataFrame by exploding arrays and flattening structs.

This method performs two primary operations: 1. Explodes any array columns, unless they are in the list of excluded columns. 2. Flattens any struct columns, renaming nested fields and appending them to the top-level DataFrame.

The method continues these operations in a loop until there are no array or struct columns left.

Parameters:

Name Type Description Default
df

The input DataFrame to normalize.

required
exclude_columns

A list of column names to exclude from the normalization process. These columns will not be exploded or flattened.

required

Returns:

Type Description

pyspark.sql.DataFrame: The normalized DataFrame with no array or struct columns.

Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
@staticmethod
def _normalize(df, exclude_columns):
    """Recursively normalizes the given DataFrame by exploding arrays and flattening structs.

    This method performs two primary operations:
    1. Explodes any array columns, unless they are in the list of excluded columns.
    2. Flattens any struct columns, renaming nested fields and appending them to the top-level DataFrame.

    The method continues these operations in a loop until there are no array or struct columns left.

    Args:
        df: The input DataFrame to normalize.
        exclude_columns: A list of column names to exclude from the normalization process. These columns
                                     will not be exploded or flattened.

    Returns:
        pyspark.sql.DataFrame: The normalized DataFrame with no array or struct columns.
    """

    def explode_arrays(df, exclude_columns):
        array_present = False
        for col in df.columns:
            if df.schema[col].dataType.typeName() == "array" and col not in exclude_columns:
                df = df.withColumn(col, F.explode(col))
                array_present = True
        return df, array_present

    def flatten_structs(df):
        struct_present = False
        struct_columns = [col for col in df.columns if df.schema[col].dataType.typeName() == "struct"]
        for col in struct_columns:
            df = df.select(F.col("*"), F.col(col + ".*"))
            nested_columns = df.select(F.col(col + ".*")).schema.names
            for nested_col in nested_columns:
                df = df.withColumnRenamed(nested_col, f"{col}_{nested_col}")
            df = df.drop(col)
            struct_present = True
        return df, struct_present

    array_present = True
    struct_present = True

    while array_present or struct_present:
        df, array_present = explode_arrays(df, exclude_columns)
        df, struct_present = flatten_structs(df)

    return df

run(context, *, exclude_columns=None, **_)

Executes the normalization process on the DataFrame present in the context.

Please note that columns retain their relative order during the normalization process, and new columns created by flattening structs are appended after the existing columns.

Parameters:

Name Type Description Default
context PipelineContext

The pipeline context that contains the DataFrame to be normalized.

required
exclude_columns list[str] | None

A list of column names to exclude from the normalization process. These columns will not be exploded or flattened.

None
**_ Any

Additional keyword arguments (not used).

{}

Returns:

Type Description
PipelineContext

A new pipeline context with the normalized DataFrame.

Raises:

Type Description
ValueError

If the DataFrame in the context is None.

Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
def run(
    self,
    context: PipelineContext,
    *,
    exclude_columns: list[str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Executes the normalization process on the DataFrame present in the context.

    Please note that columns retain their relative order during the
    normalization process, and new columns created by flattening structs are
    appended after the existing columns.

    Args:
        context: The pipeline context that contains the DataFrame to be normalized.
        exclude_columns: A list of column names to exclude from the normalization process.
                These columns will not be exploded or flattened.
        **_: Additional keyword arguments (not used).

    Returns:
        A new pipeline context with the normalized DataFrame.

    Raises:
        ValueError: If the DataFrame in the context is `None`.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    if not exclude_columns:
        exclude_columns = []
    df = TransformJsonNormalize._normalize(context.data, exclude_columns=cast(list, exclude_columns))
    return context.from_existing(data=df)

TransformRenameColumnsAction

Bases: PipelineAction

Renames the specified columns in the DataFrame.

This method updates the DataFrame in the provided context by renaming columns according to the mapping defined in the columns dictionary, where each key represents an old column name and its corresponding value represents the new column name.

Example
Rename Column:
    action: TRANSFORM_RENAME_COLUMNS
    options:
        columns:
            a_very_long_column_name: shortname
Source code in src/cloe_nessy/pipeline/actions/transform_rename_columns.py
class TransformRenameColumnsAction(PipelineAction):
    """Renames the specified columns in the DataFrame.

    This method updates the DataFrame in the provided context by renaming columns according
    to the mapping defined in the `columns` dictionary, where each key represents an old column
    name and its corresponding value represents the new column name.

    Example:
        ```yaml
        Rename Column:
            action: TRANSFORM_RENAME_COLUMNS
            options:
                columns:
                    a_very_long_column_name: shortname
        ```
    """

    name: str = "TRANSFORM_RENAME_COLUMNS"

    def run(
        self,
        context: PipelineContext,
        *,
        columns: dict[str, str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Renames the specified columns in the DataFrame.

        Args:
            context: Context in which this Action is executed.
            columns: A dictionary where the key is the old column name
                and the value is the new column name.

        Raises:
            ValueError: If no columns are provided.
            ValueError: If the data from context is None.

        Returns:
            Context after the execution of this Action.
        """
        if not columns:
            raise ValueError("No columns provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data

        if isinstance(columns, dict):
            df = df.withColumnsRenamed(columns)
        else:
            raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

        return context.from_existing(data=df)  # type: ignore

run(context, *, columns=None, **_)

Renames the specified columns in the DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
columns dict[str, str] | None

A dictionary where the key is the old column name and the value is the new column name.

None

Raises:

Type Description
ValueError

If no columns are provided.

ValueError

If the data from context is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_rename_columns.py
def run(
    self,
    context: PipelineContext,
    *,
    columns: dict[str, str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Renames the specified columns in the DataFrame.

    Args:
        context: Context in which this Action is executed.
        columns: A dictionary where the key is the old column name
            and the value is the new column name.

    Raises:
        ValueError: If no columns are provided.
        ValueError: If the data from context is None.

    Returns:
        Context after the execution of this Action.
    """
    if not columns:
        raise ValueError("No columns provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data

    if isinstance(columns, dict):
        df = df.withColumnsRenamed(columns)
    else:
        raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

    return context.from_existing(data=df)  # type: ignore

TransformReplaceValuesAction

Bases: PipelineAction

Replaces specified values in the given DataFrame.

This method iterates over the specified replace dictionary, where each key is a column name and each value is another dictionary containing old values as keys and new values as the corresponding values. The method updates the DataFrame by replacing occurrences of the old values with the new ones in the specified columns.

Example
Replace Values:
    action: TRANSFORM_REPLACE_VALUES
    options:
        replace:
            empl_function:
                sales_employee: seller
Source code in src/cloe_nessy/pipeline/actions/transform_replace_values.py
class TransformReplaceValuesAction(PipelineAction):
    """Replaces specified values in the given DataFrame.

    This method iterates over the specified `replace` dictionary, where each key is a column name
    and each value is another dictionary containing old values as keys and new values as the corresponding
    values. The method updates the DataFrame by replacing occurrences of the old values with the new ones
    in the specified columns.

    Example:
        ```yaml
        Replace Values:
            action: TRANSFORM_REPLACE_VALUES
            options:
                replace:
                    empl_function:
                        sales_employee: seller
        ```
    """

    name: str = "TRANSFORM_REPLACE_VALUES"

    def run(
        self,
        context: PipelineContext,
        *,
        replace: dict[str, dict[str, str]] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Replaces specified values in the given DataFrame.

        Args:
            context: Context in which this Action is executed.
            replace: A dictionary where each key is the column name
                and the corresponding value is another dictionary mapping old values to new values.

        Raises:
            ValueError: If no replace values are provided.
            ValueError: If the data from context is None.

        Returns:
            Context after the execution of this Action.
        """
        if not replace:
            raise ValueError("No replace values provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data
        for column, to_replace in replace.items():
            df = df.replace(to_replace=to_replace, subset=[column])  # type: ignore

        return context.from_existing(data=df)  # type: ignore

run(context, *, replace=None, **_)

Replaces specified values in the given DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
replace dict[str, dict[str, str]] | None

A dictionary where each key is the column name and the corresponding value is another dictionary mapping old values to new values.

None

Raises:

Type Description
ValueError

If no replace values are provided.

ValueError

If the data from context is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_replace_values.py
def run(
    self,
    context: PipelineContext,
    *,
    replace: dict[str, dict[str, str]] | None = None,
    **_: Any,
) -> PipelineContext:
    """Replaces specified values in the given DataFrame.

    Args:
        context: Context in which this Action is executed.
        replace: A dictionary where each key is the column name
            and the corresponding value is another dictionary mapping old values to new values.

    Raises:
        ValueError: If no replace values are provided.
        ValueError: If the data from context is None.

    Returns:
        Context after the execution of this Action.
    """
    if not replace:
        raise ValueError("No replace values provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data
    for column, to_replace in replace.items():
        df = df.replace(to_replace=to_replace, subset=[column])  # type: ignore

    return context.from_existing(data=df)  # type: ignore

TransformSelectColumnsAction

Bases: PipelineAction

Selects specified columns from the given DataFrame.

This method allows you to include or exclude specific columns from the DataFrame. If include_columns is provided, only those columns will be selected. If exclude_columns is provided, all columns except those will be selected. The method ensures that the specified columns exist in the DataFrame before performing the selection.

Example

Example Input Data:

id name coordinates attributes
1 Alice [10.0, 20.0] {"age": 30, "city": "NY"}
2 Bob [30.0, 40.0] {"age": 25, "city": "LA"}

Select Columns:
    action: TRANSFORM_SELECT_COLUMNS
    options:
        include_columns:
            - id
            - name
            - coordinates
Example Output Data:

id name coordinates
1 Alice [10.0, 20.0]
2 Bob [30.0, 40.0]

Select Columns:
    action: TRANSFORM_SELECT_COLUMNS
    options:
        exclude_columns:
            - coordinates
Example Output Data:

id name attributes
1 Alice {"age": 30, "city": "NY"}
2 Bob {"age": 25, "city": "LA"}
Source code in src/cloe_nessy/pipeline/actions/transform_select_columns.py
class TransformSelectColumnsAction(PipelineAction):
    """Selects specified columns from the given DataFrame.

    This method allows you to include or exclude specific columns from the
    DataFrame. If `include_columns` is provided, only those columns will be
    selected. If `exclude_columns` is provided, all columns except those will be
    selected. The method ensures that the specified columns exist in the
    DataFrame before performing the selection.

    Example:
        Example Input Data:

        | id | name   | coordinates          | attributes                |
        |----|--------|----------------------|---------------------------|
        | 1  | Alice  | [10.0, 20.0]         | {"age": 30, "city": "NY"} |
        | 2  | Bob    | [30.0, 40.0]         | {"age": 25, "city": "LA"} |
        === "Include Columns"
            ```yaml
            Select Columns:
                action: TRANSFORM_SELECT_COLUMNS
                options:
                    include_columns:
                        - id
                        - name
                        - coordinates
            ```
            Example Output Data:

            | id | name   | coordinates          |
            |----|--------|----------------------|
            | 1  | Alice  | [10.0, 20.0]         |
            | 2  | Bob    | [30.0, 40.0]         |

        === "Exclude Columns"
            ```yaml
            Select Columns:
                action: TRANSFORM_SELECT_COLUMNS
                options:
                    exclude_columns:
                        - coordinates
            ```
            Example Output Data:

            | id | name   | attributes                |
            |----|--------|---------------------------|
            | 1  | Alice  | {"age": 30, "city": "NY"} |
            | 2  | Bob    | {"age": 25, "city": "LA"} |

    """

    name: str = "TRANSFORM_SELECT_COLUMNS"

    def run(
        self,
        context: PipelineContext,
        *,
        include_columns: list[str] | None = None,
        exclude_columns: list[str] | None = None,
        raise_on_non_existing_columns: bool = True,
        **_: Any,
    ) -> PipelineContext:
        """Selects specified columns from the given DataFrame.

        Args:
            context: Context in which this Action is executed.
            include_columns: A list of column names that should be included.
                If provided, only these columns will be selected.
            exclude_columns: A list of column names that should be excluded.
                If provided, all columns except these will be selected.
            raise_on_non_existing_columns: If True, raise an error if a specified
                column is not found in the DataFrame. If False, ignore the column
                and continue with the selection.

        Raises:
            ValueError: If a specified column is not found in the DataFrame.
            ValueError: If neither include_columns nor exclude_columns are provided,
                or if both are provided.

        Returns:
            Context after the execution of this Action.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data

        if (not include_columns and not exclude_columns) or (include_columns and exclude_columns):
            raise ValueError("Please define either 'include_columns' or 'exclude_columns'.")

        def check_missing_columns(df, columns, raise_on_non_existing_columns):
            if raise_on_non_existing_columns:
                missing_columns = [col for col in columns if col not in df.columns]
                if missing_columns:
                    raise ValueError(f"Columns not found in DataFrame: {missing_columns}")

        try:
            if include_columns:
                check_missing_columns(df, include_columns, raise_on_non_existing_columns)
                df_selected = df.select(*include_columns)
            elif exclude_columns:
                check_missing_columns(df, exclude_columns, raise_on_non_existing_columns)
                df_selected = df.drop(*exclude_columns)
        except Exception as e:
            raise ValueError(f"Column selection error: {e}") from e

        return context.from_existing(data=df_selected)  # type: ignore

run(context, *, include_columns=None, exclude_columns=None, raise_on_non_existing_columns=True, **_)

Selects specified columns from the given DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
include_columns list[str] | None

A list of column names that should be included. If provided, only these columns will be selected.

None
exclude_columns list[str] | None

A list of column names that should be excluded. If provided, all columns except these will be selected.

None
raise_on_non_existing_columns bool

If True, raise an error if a specified column is not found in the DataFrame. If False, ignore the column and continue with the selection.

True

Raises:

Type Description
ValueError

If a specified column is not found in the DataFrame.

ValueError

If neither include_columns nor exclude_columns are provided, or if both are provided.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_select_columns.py
def run(
    self,
    context: PipelineContext,
    *,
    include_columns: list[str] | None = None,
    exclude_columns: list[str] | None = None,
    raise_on_non_existing_columns: bool = True,
    **_: Any,
) -> PipelineContext:
    """Selects specified columns from the given DataFrame.

    Args:
        context: Context in which this Action is executed.
        include_columns: A list of column names that should be included.
            If provided, only these columns will be selected.
        exclude_columns: A list of column names that should be excluded.
            If provided, all columns except these will be selected.
        raise_on_non_existing_columns: If True, raise an error if a specified
            column is not found in the DataFrame. If False, ignore the column
            and continue with the selection.

    Raises:
        ValueError: If a specified column is not found in the DataFrame.
        ValueError: If neither include_columns nor exclude_columns are provided,
            or if both are provided.

    Returns:
        Context after the execution of this Action.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data

    if (not include_columns and not exclude_columns) or (include_columns and exclude_columns):
        raise ValueError("Please define either 'include_columns' or 'exclude_columns'.")

    def check_missing_columns(df, columns, raise_on_non_existing_columns):
        if raise_on_non_existing_columns:
            missing_columns = [col for col in columns if col not in df.columns]
            if missing_columns:
                raise ValueError(f"Columns not found in DataFrame: {missing_columns}")

    try:
        if include_columns:
            check_missing_columns(df, include_columns, raise_on_non_existing_columns)
            df_selected = df.select(*include_columns)
        elif exclude_columns:
            check_missing_columns(df, exclude_columns, raise_on_non_existing_columns)
            df_selected = df.drop(*exclude_columns)
    except Exception as e:
        raise ValueError(f"Column selection error: {e}") from e

    return context.from_existing(data=df_selected)  # type: ignore

TransformSqlAction

Bases: PipelineAction

Executes a SQL statement on a DataFrame within the provided context.

A temporary view is created from the current DataFrame, and the SQL statement is executed on that view. The resulting DataFrame is returned.

Example
SQL Transform:
    action: TRANSFORM_SQL
    options:
        sql_statement: select city, revenue, firm from {DATA_FRAME} where product="Databricks"

Note

The SQL statement should reference the DataFrame as "{DATA_FRAME}". This nessy specific placeholder will be replaced with your input DataFrame from the context. If your pipeline is defined as an f-string, you can escape the curly braces by doubling them, e.g., "{{DATA_FRAME}}".

Source code in src/cloe_nessy/pipeline/actions/transform_generic_sql.py
class TransformSqlAction(PipelineAction):
    """Executes a SQL statement on a DataFrame within the provided context.

    A temporary view is created from the current DataFrame, and the SQL
    statement is executed on that view. The resulting DataFrame is returned.

    Example:
        ```yaml
        SQL Transform:
            action: TRANSFORM_SQL
            options:
                sql_statement: select city, revenue, firm from {DATA_FRAME} where product="Databricks"
        ```
        !!! note
            The SQL statement should reference the DataFrame as "{DATA_FRAME}".
            This nessy specific placeholder will be replaced with your input
            DataFrame from the context. If your pipeline is defined as an
            f-string, you can escape the curly braces by doubling them, e.g.,
            "{{DATA_FRAME}}".
    """

    name: str = "TRANSFORM_SQL"

    def run(
        self,
        context: PipelineContext,
        *,
        sql_statement: str = "",
        **kwargs: Any,
    ) -> PipelineContext:
        """Executes a SQL statement on a DataFrame within the provided context.

        Args:
            context: Context in which this Action is executed.
            sql_statement: A string containing the SQL statement to be
                executed. The source table should be referred to as "{DATA_FRAME}".
            **kwargs: Additional keyword arguments are passed as placeholders to the
                SQL statement.

        Raises:
            ValueError: If "{DATA_FRAME}" is not included in the SQL statement.
            ValueError: If no SQL statement is provided.
            ValueError: If the data from the context is None.

        Returns:
            Context after the execution of this Action, containing the DataFrame resulting from the SQL statement.
        """
        if not sql_statement:
            raise ValueError("No SQL statement provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        _spark = SessionManager.get_spark_session()

        temp_view_name = str(uuid.uuid1()).replace("-", "_")
        context.data.createTempView(temp_view_name)

        if "FROM {DATA_FRAME}".casefold() not in sql_statement.casefold():
            raise ValueError("Please use 'FROM {DATA_FRAME}' in your SQL statement.")

        df = _spark.sql(sql_statement.format(DATA_FRAME=temp_view_name, **kwargs))

        return context.from_existing(data=df)

run(context, *, sql_statement='', **kwargs)

Executes a SQL statement on a DataFrame within the provided context.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
sql_statement str

A string containing the SQL statement to be executed. The source table should be referred to as "{DATA_FRAME}".

''
**kwargs Any

Additional keyword arguments are passed as placeholders to the SQL statement.

{}

Raises:

Type Description
ValueError

If "{DATA_FRAME}" is not included in the SQL statement.

ValueError

If no SQL statement is provided.

ValueError

If the data from the context is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action, containing the DataFrame resulting from the SQL statement.

Source code in src/cloe_nessy/pipeline/actions/transform_generic_sql.py
def run(
    self,
    context: PipelineContext,
    *,
    sql_statement: str = "",
    **kwargs: Any,
) -> PipelineContext:
    """Executes a SQL statement on a DataFrame within the provided context.

    Args:
        context: Context in which this Action is executed.
        sql_statement: A string containing the SQL statement to be
            executed. The source table should be referred to as "{DATA_FRAME}".
        **kwargs: Additional keyword arguments are passed as placeholders to the
            SQL statement.

    Raises:
        ValueError: If "{DATA_FRAME}" is not included in the SQL statement.
        ValueError: If no SQL statement is provided.
        ValueError: If the data from the context is None.

    Returns:
        Context after the execution of this Action, containing the DataFrame resulting from the SQL statement.
    """
    if not sql_statement:
        raise ValueError("No SQL statement provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    _spark = SessionManager.get_spark_session()

    temp_view_name = str(uuid.uuid1()).replace("-", "_")
    context.data.createTempView(temp_view_name)

    if "FROM {DATA_FRAME}".casefold() not in sql_statement.casefold():
        raise ValueError("Please use 'FROM {DATA_FRAME}' in your SQL statement.")

    df = _spark.sql(sql_statement.format(DATA_FRAME=temp_view_name, **kwargs))

    return context.from_existing(data=df)

TransformUnionAction

Bases: PipelineAction

Unions multiple DataFrames together.

This method takes the current DataFrame from the context and unites it with additional DataFrames specified in the union_data argument. All DataFrames must have the same schema. If any DataFrame in union_data is None or empty, a ValueError will be raised.

Example
Union Tables:
    action: TRANSFORM_UNION
    options:
        union_data:
            - ((step: Filter First Table))
            - ((step: SQL Transform Second Table))

Referencing a DataFrame from another step

The union_data parameter is a reference to the DataFrame from another step. The DataFrame is accessed using the result attribute of the PipelineStep. The syntax for referencing the DataFrame is ((step:Step Name)), mind the double parentheses.

Source code in src/cloe_nessy/pipeline/actions/transform_union.py
class TransformUnionAction(PipelineAction):
    """Unions multiple DataFrames together.

    This method takes the current DataFrame from the context and unites it with
    additional DataFrames specified in the `union_data` argument. All DataFrames
    must have the same schema. If any DataFrame in `union_data` is None or
    empty, a ValueError will be raised.

    Example:
        ```yaml
        Union Tables:
            action: TRANSFORM_UNION
            options:
                union_data:
                    - ((step: Filter First Table))
                    - ((step: SQL Transform Second Table))
        ```
        !!! note "Referencing a DataFrame from another step"
            The `union_data` parameter is a reference to the DataFrame from another step.
            The DataFrame is accessed using the `result` attribute of the PipelineStep. The syntax
            for referencing the DataFrame is `((step:Step Name))`, mind the double parentheses.
    """

    name: str = "TRANSFORM_UNION"

    def run(
        self,
        context: PipelineContext,
        *,
        union_data: list[PipelineStep] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Unions multiple DataFrames together.

        Args:
            context: Context in which this Action is executed.
            union_data: A list of PipelineSteps that define the DataFrames
                to union with the current context.

        Raises:
            ValueError: If no union_data is provided.
            ValueError: If the data from context is None.
            ValueError: If the data from any of the union_data is None.

        Returns:
            Context after the execution of this Action.
        """
        if not union_data:
            raise ValueError("No union_data provided.")

        # Check that all union_data contexts have valid data
        result_contexts = []
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        for ctx in union_data:
            if ctx.result is None or ctx.result.data is None:
                raise ValueError(f"Data from the context of step '{ctx.name}' is required for the operation.")
            result_contexts.append(ctx.result.data)

        # Union all DataFrames
        union_dfs = [context.data] + result_contexts
        df = reduce(DataFrame.unionAll, union_dfs)  # type: ignore

        return context.from_existing(data=df)  # type: ignore

run(context, *, union_data=None, **_)

Unions multiple DataFrames together.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
union_data list[PipelineStep] | None

A list of PipelineSteps that define the DataFrames to union with the current context.

None

Raises:

Type Description
ValueError

If no union_data is provided.

ValueError

If the data from context is None.

ValueError

If the data from any of the union_data is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_union.py
def run(
    self,
    context: PipelineContext,
    *,
    union_data: list[PipelineStep] | None = None,
    **_: Any,
) -> PipelineContext:
    """Unions multiple DataFrames together.

    Args:
        context: Context in which this Action is executed.
        union_data: A list of PipelineSteps that define the DataFrames
            to union with the current context.

    Raises:
        ValueError: If no union_data is provided.
        ValueError: If the data from context is None.
        ValueError: If the data from any of the union_data is None.

    Returns:
        Context after the execution of this Action.
    """
    if not union_data:
        raise ValueError("No union_data provided.")

    # Check that all union_data contexts have valid data
    result_contexts = []
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    for ctx in union_data:
        if ctx.result is None or ctx.result.data is None:
            raise ValueError(f"Data from the context of step '{ctx.name}' is required for the operation.")
        result_contexts.append(ctx.result.data)

    # Union all DataFrames
    union_dfs = [context.data] + result_contexts
    df = reduce(DataFrame.unionAll, union_dfs)  # type: ignore

    return context.from_existing(data=df)  # type: ignore

WriteCatalogTableAction

Bases: PipelineAction

Writes a DataFrame to a specified catalog table using CatalogWriter.

Example
Write Table to Catalog:
    action: WRITE_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        mode: append
        partition_by: day
        options:
            mergeSchema: true
Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
class WriteCatalogTableAction(PipelineAction):
    """Writes a DataFrame to a specified catalog table using [CatalogWriter][cloe_nessy.integration.writer.CatalogWriter].

    Example:
        ```yaml
        Write Table to Catalog:
            action: WRITE_CATALOG_TABLE
            options:
                table_identifier: my_catalog.business_schema.sales_table
                mode: append
                partition_by: day
                options:
                    mergeSchema: true
        ```
    """

    name: str = "WRITE_CATALOG_TABLE"

    @staticmethod
    def run(
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        mode: str = "append",
        partition_by: str | list[str] | None = None,
        options: dict[str, str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Writes a DataFrame to a specified catalog table.

        Args:
            context: Context in which this Action is executed.
            table_identifier: The table identifier in the unity catalog in the
                format 'catalog.schema.table'. If not provided, attempts to use the
                context's table metadata.
            mode: The write mode. One of 'append', 'overwrite', 'error',
                'errorifexists', or 'ignore'.
            partition_by: Names of the partitioning columns.
            options: PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true).

        Raises:
            ValueError: If the table name is not specified or cannot be inferred from
                the context.

        Returns:
            Context after the execution of this Action.
        """
        if not options:
            options = dict()
        if partition_by is None:
            if hasattr(context.table_metadata, "partition_by"):
                partition_by = context.table_metadata.partition_by  # type: ignore

        if (table_metadata := context.table_metadata) and table_identifier is None:
            table_identifier = table_metadata.identifier
        if table_identifier is None:
            raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        writer = CatalogWriter()
        writer.write_table(
            df=context.data,  # type: ignore
            table_identifier=table_identifier,
            mode=mode,
            partition_by=partition_by,
            options=options,
        )
        return context.from_existing()

run(context, *, table_identifier=None, mode='append', partition_by=None, options=None, **_) staticmethod

Writes a DataFrame to a specified catalog table.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
table_identifier str | None

The table identifier in the unity catalog in the format 'catalog.schema.table'. If not provided, attempts to use the context's table metadata.

None
mode str

The write mode. One of 'append', 'overwrite', 'error', 'errorifexists', or 'ignore'.

'append'
partition_by str | list[str] | None

Names of the partitioning columns.

None
options dict[str, str] | None

PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true).

None

Raises:

Type Description
ValueError

If the table name is not specified or cannot be inferred from the context.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
@staticmethod
def run(
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    mode: str = "append",
    partition_by: str | list[str] | None = None,
    options: dict[str, str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Writes a DataFrame to a specified catalog table.

    Args:
        context: Context in which this Action is executed.
        table_identifier: The table identifier in the unity catalog in the
            format 'catalog.schema.table'. If not provided, attempts to use the
            context's table metadata.
        mode: The write mode. One of 'append', 'overwrite', 'error',
            'errorifexists', or 'ignore'.
        partition_by: Names of the partitioning columns.
        options: PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true).

    Raises:
        ValueError: If the table name is not specified or cannot be inferred from
            the context.

    Returns:
        Context after the execution of this Action.
    """
    if not options:
        options = dict()
    if partition_by is None:
        if hasattr(context.table_metadata, "partition_by"):
            partition_by = context.table_metadata.partition_by  # type: ignore

    if (table_metadata := context.table_metadata) and table_identifier is None:
        table_identifier = table_metadata.identifier
    if table_identifier is None:
        raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    writer = CatalogWriter()
    writer.write_table(
        df=context.data,  # type: ignore
        table_identifier=table_identifier,
        mode=mode,
        partition_by=partition_by,
        options=options,
    )
    return context.from_existing()

WriteDeltaAppendAction

Bases: PipelineAction

This class implements an Append action for an ETL pipeline.

The WriteDeltaAppendAction appends a Dataframe to Delta Table.

Example
Write Delta Append:
    action: WRITE_DELTA_APPEND
    options:
        table_identifier: my_catalog.my_schema.my_table
        ignore_empty_df: false

Returns:

Type Description

None.

Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
class WriteDeltaAppendAction(PipelineAction):
    """This class implements an Append action for an ETL pipeline.

    The WriteDeltaAppendAction appends a Dataframe to Delta Table.

    Example:
        ```yaml
        Write Delta Append:
            action: WRITE_DELTA_APPEND
            options:
                table_identifier: my_catalog.my_schema.my_table
                ignore_empty_df: false
        ```

    Returns:
        None.
    """

    name: str = "WRITE_DELTA_APPEND"

    def run(
        self,
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        ignore_empty_df: bool = False,
        options: dict[str, Any] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Merge the dataframe into the delta table.

        Args:
            context: Context in which this Action is executed.
            table_identifier: The identifier of the table. If passed, the
                UC Adapter will be used to create a table object. Otherwise the Table
                object will be created from the table metadata in the context.
            ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
            options: Additional options for the append writer.

        Raises:
            ValueError: If the table does not exist.
            ValueError: If the data is not set in the pipeline context.
            ValueError: If the table metadata is empty.

        Returns:
            Pipeline Context
        """
        delta_append_writer = DeltaAppendWriter()

        if context.data is None:
            raise ValueError("Data is required for the append operation.")
        if context.table_metadata is None and table_identifier is None:
            raise ValueError("Table metadata or a table identifier are required for the append operation.")

        if table_identifier is not None:
            context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
        else:
            if context.table_metadata is None:
                raise ValueError("Table metadata is required.")

        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

        delta_append_writer.write(
            table_identifier=context.table_metadata.identifier,
            table_location=context.table_metadata.storage_path,
            data_frame=context.data,
            ignore_empty_df=ignore_empty_df,
            options=options,
        )

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        return context.from_existing()

run(context, *, table_identifier=None, ignore_empty_df=False, options=None, **_)

Merge the dataframe into the delta table.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
table_identifier str | None

The identifier of the table. If passed, the UC Adapter will be used to create a table object. Otherwise the Table object will be created from the table metadata in the context.

None
ignore_empty_df bool

A flag indicating whether to ignore an empty source dataframe.

False
options dict[str, Any] | None

Additional options for the append writer.

None

Raises:

Type Description
ValueError

If the table does not exist.

ValueError

If the data is not set in the pipeline context.

ValueError

If the table metadata is empty.

Returns:

Type Description
PipelineContext

Pipeline Context

Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
def run(
    self,
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    ignore_empty_df: bool = False,
    options: dict[str, Any] | None = None,
    **_: Any,
) -> PipelineContext:
    """Merge the dataframe into the delta table.

    Args:
        context: Context in which this Action is executed.
        table_identifier: The identifier of the table. If passed, the
            UC Adapter will be used to create a table object. Otherwise the Table
            object will be created from the table metadata in the context.
        ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
        options: Additional options for the append writer.

    Raises:
        ValueError: If the table does not exist.
        ValueError: If the data is not set in the pipeline context.
        ValueError: If the table metadata is empty.

    Returns:
        Pipeline Context
    """
    delta_append_writer = DeltaAppendWriter()

    if context.data is None:
        raise ValueError("Data is required for the append operation.")
    if context.table_metadata is None and table_identifier is None:
        raise ValueError("Table metadata or a table identifier are required for the append operation.")

    if table_identifier is not None:
        context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
    else:
        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

    if context.table_metadata is None:
        raise ValueError("Table metadata is required.")

    delta_append_writer.write(
        table_identifier=context.table_metadata.identifier,
        table_location=context.table_metadata.storage_path,
        data_frame=context.data,
        ignore_empty_df=ignore_empty_df,
        options=options,
    )

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    return context.from_existing()

WriteDeltaMergeAction

Bases: PipelineAction

This class implements a Merge action for an ETL pipeline.

The MergeIntoDeltaAction merges a Dataframe to Delta Table.

Example
Write Delta Merge:
    action: WRITE_DELTA_MERGE
    options:
        table_identifier: my_catalog.my_schema.my_table
        key_columns:
            - id
            - customer_id
        cols_to_update:
            - name
            - email
            - updated_at
        when_matched_update: true
        when_not_matched_insert: true
        use_partition_pruning: true

Returns:

Type Description

None.

Source code in src/cloe_nessy/pipeline/actions/write_delta_merge.py
class WriteDeltaMergeAction(PipelineAction):
    """This class implements a Merge action for an ETL pipeline.

    The MergeIntoDeltaAction merges a Dataframe to Delta Table.

    Example:
        ```yaml
        Write Delta Merge:
            action: WRITE_DELTA_MERGE
            options:
                table_identifier: my_catalog.my_schema.my_table
                key_columns:
                    - id
                    - customer_id
                cols_to_update:
                    - name
                    - email
                    - updated_at
                when_matched_update: true
                when_not_matched_insert: true
                use_partition_pruning: true
        ```

    Returns:
        None.
    """

    name: str = "WRITE_DELTA_MERGE"

    def run(
        self,
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        key_columns: list[str] | None = None,
        cols_to_update: list[str] | None = None,
        cols_to_insert: list[str] | None = None,
        cols_to_exclude: list[str] | None = None,
        when_matched_update: bool = True,
        when_matched_deleted: bool = False,
        when_not_matched_insert: bool = True,
        use_partition_pruning: bool = True,
        ignore_empty_df: bool = False,
        create_if_not_exists: bool = True,
        refresh_table: bool = True,
        **_: Any,
    ) -> PipelineContext:
        """Merge the dataframe into the delta table.

        Args:
            context: Context in which this Action is executed.
            table_identifier: The identifier of the table. If passed, the
                UC Adapter will be used to create a table object. Otherwise the Table
                object will be created from the table metadata in the context.
            key_columns: List of column names that form the
                key for the merge operation.
            when_matched_update: Flag to specify whether to
                perform an update operation whenmatching records are found in
                the target Delta table.
            when_matched_deleted: Flag to specify whether to
                perform a delete operation when matching records are found in
                the target Delta table.
            when_not_matched_insert: Flag to specify whether to perform an
                insert operation when matching records are not found in the target
                Delta table.
            cols_to_update: List of column names to be
                updated in the target Delta table.
            cols_to_insert: List of column names to be
                inserted into the target Delta table.
            cols_to_exclude: List of column names to be
                excluded from the merge operation.
            use_partition_pruning: Flag to specify whether to use partition
                pruning to optimize the performance of the merge operation.
            ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
            create_if_not_exists: Create the table if it not exists.
            refresh_table: Refresh the table after the transaction.

        Raises:
            ValueError: If the table does not exist.
            ValueError: If the data is not set in the pipeline context.
            ValueError: If the table metadata is empty.

        Returns:
            Pipeline Context
        """
        delta_merge_writer = DeltaMergeWriter()

        if context.data is None:
            raise ValueError("Data is required for the merge operation.")
        if context.table_metadata is None and table_identifier is None:
            raise ValueError("Table metadata or a table identifier are required for the merge operation.")

        if table_identifier is not None:
            context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
        else:
            if context.table_metadata is None:
                raise ValueError("Table metadata is required.")

        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

        if create_if_not_exists:
            delta_merge_writer.table_manager.create_table(table=context.table_metadata, ignore_if_exists=True)

        if not delta_merge_writer.table_manager.table_exists(context.table_metadata):
            raise ValueError(f"Table {context.table_metadata.name} does not exist.")

        assert key_columns is not None, "Key columns must be provided."

        delta_merge_writer.write(
            table_identifier=context.table_metadata.identifier,
            storage_path=str(context.table_metadata.storage_path),
            data_frame=context.data,
            key_columns=key_columns,
            cols_to_update=cols_to_update,
            cols_to_insert=cols_to_insert,
            cols_to_exclude=cols_to_exclude,
            when_matched_update=when_matched_update,
            when_matched_deleted=when_matched_deleted,
            when_not_matched_insert=when_not_matched_insert,
            use_partition_pruning=use_partition_pruning,
            partition_by=context.table_metadata.partition_by,
            ignore_empty_df=ignore_empty_df,
        )

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        if refresh_table:
            delta_merge_writer.table_manager.refresh_table(table_identifier=context.table_metadata.identifier)

        return context.from_existing()

run(context, *, table_identifier=None, key_columns=None, cols_to_update=None, cols_to_insert=None, cols_to_exclude=None, when_matched_update=True, when_matched_deleted=False, when_not_matched_insert=True, use_partition_pruning=True, ignore_empty_df=False, create_if_not_exists=True, refresh_table=True, **_)

Merge the dataframe into the delta table.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
table_identifier str | None

The identifier of the table. If passed, the UC Adapter will be used to create a table object. Otherwise the Table object will be created from the table metadata in the context.

None
key_columns list[str] | None

List of column names that form the key for the merge operation.

None
when_matched_update bool

Flag to specify whether to perform an update operation whenmatching records are found in the target Delta table.

True
when_matched_deleted bool

Flag to specify whether to perform a delete operation when matching records are found in the target Delta table.

False
when_not_matched_insert bool

Flag to specify whether to perform an insert operation when matching records are not found in the target Delta table.

True
cols_to_update list[str] | None

List of column names to be updated in the target Delta table.

None
cols_to_insert list[str] | None

List of column names to be inserted into the target Delta table.

None
cols_to_exclude list[str] | None

List of column names to be excluded from the merge operation.

None
use_partition_pruning bool

Flag to specify whether to use partition pruning to optimize the performance of the merge operation.

True
ignore_empty_df bool

A flag indicating whether to ignore an empty source dataframe.

False
create_if_not_exists bool

Create the table if it not exists.

True
refresh_table bool

Refresh the table after the transaction.

True

Raises:

Type Description
ValueError

If the table does not exist.

ValueError

If the data is not set in the pipeline context.

ValueError

If the table metadata is empty.

Returns:

Type Description
PipelineContext

Pipeline Context

Source code in src/cloe_nessy/pipeline/actions/write_delta_merge.py
def run(
    self,
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    key_columns: list[str] | None = None,
    cols_to_update: list[str] | None = None,
    cols_to_insert: list[str] | None = None,
    cols_to_exclude: list[str] | None = None,
    when_matched_update: bool = True,
    when_matched_deleted: bool = False,
    when_not_matched_insert: bool = True,
    use_partition_pruning: bool = True,
    ignore_empty_df: bool = False,
    create_if_not_exists: bool = True,
    refresh_table: bool = True,
    **_: Any,
) -> PipelineContext:
    """Merge the dataframe into the delta table.

    Args:
        context: Context in which this Action is executed.
        table_identifier: The identifier of the table. If passed, the
            UC Adapter will be used to create a table object. Otherwise the Table
            object will be created from the table metadata in the context.
        key_columns: List of column names that form the
            key for the merge operation.
        when_matched_update: Flag to specify whether to
            perform an update operation whenmatching records are found in
            the target Delta table.
        when_matched_deleted: Flag to specify whether to
            perform a delete operation when matching records are found in
            the target Delta table.
        when_not_matched_insert: Flag to specify whether to perform an
            insert operation when matching records are not found in the target
            Delta table.
        cols_to_update: List of column names to be
            updated in the target Delta table.
        cols_to_insert: List of column names to be
            inserted into the target Delta table.
        cols_to_exclude: List of column names to be
            excluded from the merge operation.
        use_partition_pruning: Flag to specify whether to use partition
            pruning to optimize the performance of the merge operation.
        ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
        create_if_not_exists: Create the table if it not exists.
        refresh_table: Refresh the table after the transaction.

    Raises:
        ValueError: If the table does not exist.
        ValueError: If the data is not set in the pipeline context.
        ValueError: If the table metadata is empty.

    Returns:
        Pipeline Context
    """
    delta_merge_writer = DeltaMergeWriter()

    if context.data is None:
        raise ValueError("Data is required for the merge operation.")
    if context.table_metadata is None and table_identifier is None:
        raise ValueError("Table metadata or a table identifier are required for the merge operation.")

    if table_identifier is not None:
        context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
    else:
        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

    if context.table_metadata is None:
        raise ValueError("Table metadata is required.")

    if create_if_not_exists:
        delta_merge_writer.table_manager.create_table(table=context.table_metadata, ignore_if_exists=True)

    if not delta_merge_writer.table_manager.table_exists(context.table_metadata):
        raise ValueError(f"Table {context.table_metadata.name} does not exist.")

    assert key_columns is not None, "Key columns must be provided."

    delta_merge_writer.write(
        table_identifier=context.table_metadata.identifier,
        storage_path=str(context.table_metadata.storage_path),
        data_frame=context.data,
        key_columns=key_columns,
        cols_to_update=cols_to_update,
        cols_to_insert=cols_to_insert,
        cols_to_exclude=cols_to_exclude,
        when_matched_update=when_matched_update,
        when_matched_deleted=when_matched_deleted,
        when_not_matched_insert=when_not_matched_insert,
        use_partition_pruning=use_partition_pruning,
        partition_by=context.table_metadata.partition_by,
        ignore_empty_df=ignore_empty_df,
    )

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    if refresh_table:
        delta_merge_writer.table_manager.refresh_table(table_identifier=context.table_metadata.identifier)

    return context.from_existing()

WriteFileAction

Bases: PipelineAction

This class implements a Write action for an ETL pipeline.

The WriteFileAction writes a Dataframe to a storage location defined in the options using the FileWriter class.

Example
Write to File:
    action: WRITE_FILE
    options:
        path: "path/to/location"
        format: "parquet"
        partition_cols: ["date"]
        mode: "append"
        is_stream: False
        options:
            mergeSchema: true
Source code in src/cloe_nessy/pipeline/actions/write_file.py
class WriteFileAction(PipelineAction):
    """This class implements a Write action for an ETL pipeline.

    The WriteFileAction writes a Dataframe to a storage location defined in the
    options using the [`FileWriter`][cloe_nessy.integration.writer.FileWriter] class.

    Example:
        ```yaml
        Write to File:
            action: WRITE_FILE
            options:
                path: "path/to/location"
                format: "parquet"
                partition_cols: ["date"]
                mode: "append"
                is_stream: False
                options:
                    mergeSchema: true
        ```
    """

    name: str = "WRITE_FILE"

    def run(
        self,
        context: PipelineContext,
        *,
        path: str = "",
        format: str = "delta",
        partition_cols: list[str] | None = None,
        mode: str = "append",
        is_stream: bool = False,
        options: dict[str, str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Writes a file to a location.

        Args:
            context: Context in which this Action is executed.
            path: Location to write data to.
            format: Format of files to write.
            partition_cols: Columns to partition on. If None, the writer will try to get the partition
                columns from the metadata. Default None.
            mode: Specifies the behavior when data or table already exists.
            is_stream: If True, use the `write_stream` method of the writer.
            options: Additional options passed to the writer.

        Raises:
            ValueError: If no path is provided.
            ValueError: If the table metadata is empty.

        Returns:
            Pipeline Context
        """
        if not path:
            raise ValueError("No path provided. Please specify path to write data to.")
        if not options:
            options = {}

        if context.data is None:
            raise ValueError("Data context is required for the operation.")

        if partition_cols is None:
            if context.table_metadata is None:
                partition_cols = []
            else:
                partition_cols = context.table_metadata.partition_by
        writer = FileWriter()
        if not is_stream:
            writer.write(
                data_frame=context.data,
                location=path,
                format=format,
                partition_cols=partition_cols,
                mode=mode,
                options=options,
            )
        else:
            writer.write_stream(
                data_frame=context.data,
                location=path,
                format=format,
                mode=mode,
                partition_cols=partition_cols,
                options=options,
            )

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        return context.from_existing()

run(context, *, path='', format='delta', partition_cols=None, mode='append', is_stream=False, options=None, **_)

Writes a file to a location.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
path str

Location to write data to.

''
format str

Format of files to write.

'delta'
partition_cols list[str] | None

Columns to partition on. If None, the writer will try to get the partition columns from the metadata. Default None.

None
mode str

Specifies the behavior when data or table already exists.

'append'
is_stream bool

If True, use the write_stream method of the writer.

False
options dict[str, str] | None

Additional options passed to the writer.

None

Raises:

Type Description
ValueError

If no path is provided.

ValueError

If the table metadata is empty.

Returns:

Type Description
PipelineContext

Pipeline Context

Source code in src/cloe_nessy/pipeline/actions/write_file.py
def run(
    self,
    context: PipelineContext,
    *,
    path: str = "",
    format: str = "delta",
    partition_cols: list[str] | None = None,
    mode: str = "append",
    is_stream: bool = False,
    options: dict[str, str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Writes a file to a location.

    Args:
        context: Context in which this Action is executed.
        path: Location to write data to.
        format: Format of files to write.
        partition_cols: Columns to partition on. If None, the writer will try to get the partition
            columns from the metadata. Default None.
        mode: Specifies the behavior when data or table already exists.
        is_stream: If True, use the `write_stream` method of the writer.
        options: Additional options passed to the writer.

    Raises:
        ValueError: If no path is provided.
        ValueError: If the table metadata is empty.

    Returns:
        Pipeline Context
    """
    if not path:
        raise ValueError("No path provided. Please specify path to write data to.")
    if not options:
        options = {}

    if context.data is None:
        raise ValueError("Data context is required for the operation.")

    if partition_cols is None:
        if context.table_metadata is None:
            partition_cols = []
        else:
            partition_cols = context.table_metadata.partition_by
    writer = FileWriter()
    if not is_stream:
        writer.write(
            data_frame=context.data,
            location=path,
            format=format,
            partition_cols=partition_cols,
            mode=mode,
            options=options,
        )
    else:
        writer.write_stream(
            data_frame=context.data,
            location=path,
            format=format,
            mode=mode,
            partition_cols=partition_cols,
            options=options,
        )

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    return context.from_existing()