Index
            ReadAPIAction
¶
    
              Bases: PipelineAction
Reads data from an API and loads it into a Spark DataFrame.
This method uses the provided API parameters to make a request using the
APIReader and return a
DataFrame containing the response data.
Example
Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: my/endpoint/
        method: GET
        timeout: 90
        auth:
            - type: basic
              username: my_username
              password: my_password
            - type: secret_scope
              secret_scope: my_secret_scope
              header_template:
                "header_key_1": "<ENVIRONMENT_VARIABLE_NAME>"
            - type: secret_scope
              secret_scope: my_secret_scope
              header_template:
                "header_key_2": "<SECRET_NAME>"
            - type: secret_scope
              secret_scope: my_other_secret_scope
              header_template:
                "header_key_3": "<SECRET_NAME>"
            - type: azure_oauth
              client_id: my_client_id
              client_secret: my_client_secret
              tenant_id: my_tenant_id
              scope: <entra-id-client-id>
The above example will combine the headers from the different auth types. The resulting header will look like this:
Secret information
Don't write sensitive information like passwords or tokens directly in the pipeline configuration. Use secret scopes or environment variables instead.
Source code in src/cloe_nessy/pipeline/actions/read_api.py
                | 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |  | 
            run(context, *, base_url=None, auth=None, default_headers=None, endpoint='', method='GET', key=None, timeout=30, params=None, headers=None, data=None, json=None, max_retries=0, options=None, **_)
  
      staticmethod
  
¶
    Utility class for reading an API into a DataFrame.
This class uses an APIClient to fetch data from an API and load it into a Spark DataFrame.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The pipeline context containing information about the pipeline. | required | 
| base_url | str | None | The base URL for the API to be called. | None | 
| auth | AuthBase | dict[str, str] | None | The authentication credentials for the API. | None | 
| default_headers | dict[str, str] | None | Default headers to include in the API request. | None | 
| endpoint | str | The specific API endpoint to call. | '' | 
| method | str | The HTTP method to use for the request (default is "GET"). | 'GET' | 
| key | str | None | Key for accessing specific data in the response. | None | 
| timeout | int | Timeout for the API request in seconds (default is 30). | 30 | 
| params | dict[str, str] | None | URL parameters to include in the API request. | None | 
| headers | dict[str, str] | None | Additional headers to include in the request. | None | 
| data | dict[str, str] | None | Data to send with the request for POST methods. | None | 
| json | dict[str, str] | None | JSON data to send with the request for POST methods. | None | 
| max_retries | int | Maximum number of retries for the API request (default is 0). | 0 | 
| options | dict[str, str] | None | Additional options for the API request. | None | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The updated pipeline context containing the DataFrame with the API response data. | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If the base_url is not specified. | 
Source code in src/cloe_nessy/pipeline/actions/read_api.py
              
            ReadCatalogTableAction
¶
    
              Bases: PipelineAction
Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.
This function retrieves data from a catalog table using the
CatalogReader identified
by either the table_identifier parameter or the table_metadata from the
provided PipelineContext of a previous step. The retrieved data is loaded
into a DataFrame and returned as part of an updated PipelineContext.
Example
Read Sales Table:
    action: READ_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        options: <options for the CatalogReader read method>
    delta_load_options:
        strategy: CDF
        delta_load_identifier: my_delta_load_id
        strategy_options:
            deduplication_columns: ["id"]
            enable_full_load: true
Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
                
            run(context, *, table_identifier=None, options=None, delta_load_options=None, **_)
  
      staticmethod
  
¶
    Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The pipeline's context, which contains metadata and configuration for the action. | required | 
| table_identifier | str | None | The identifier of the catalog table to
read. If not provided, the function will attempt to use the table
identifier from the  | None | 
| options | dict[str, str] | None | A dictionary of options for customizing
the  | None | 
| delta_load_options | dict[Any, Any] | DeltaLoadOptions | None | Options for delta loading, if applicable.
Configures the  | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If neither  | 
Returns: An updated pipeline context containing the data read from the catalog table as a DataFrame.
Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
              
            ReadExcelAction
¶
    
              Bases: PipelineAction
Reads data from an Excel file or directory of Excel files and returns a DataFrame.
The function reads Excel files using the
ExcelDataFrameReader either
from a single file or a directory path. It can read specific sheets, handle
file extensions, and offers various options to customize how the data is
read, such as specifying headers, index columns, and handling missing
values. The resulting data is returned as a DataFrame, and metadata about
the read files can be included in the context.
Example
More Options
The READ_EXCEL action supports additional options that can be passed to the
run method. For more information, refer to the method documentation.
Source code in src/cloe_nessy/pipeline/actions/read_excel.py
                | 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |  | 
            run(context, *, file=None, path=None, extension='xlsx', recursive=False, sheet_name=0, sheet_name_as_column=False, header=0, index_col=None, usecols=None, dtype=None, fillna=None, true_values=None, false_values=None, nrows=None, na_values=None, keep_default_na=True, parse_dates=False, date_parser=None, thousands=None, include_index=False, options=None, add_metadata_column=True, load_as_strings=False, **_)
¶
    Reads data from an Excel file or directory of Excel files and returns a DataFrame.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The context in which the action is executed. | required | 
| file | str | None | The path to a single Excel file. Either  | None | 
| path | str | None | The directory path containing multiple Excel files. Either  | None | 
| extension | str | The file extension to look for when reading from a directory. | 'xlsx' | 
| recursive | bool | Whether to include subdirectories when reading from a directory path. | False | 
| sheet_name | str | int | list | The sheet name(s) or index(es) to read from the Excel file. | 0 | 
| sheet_name_as_column | bool | Whether to add a column with the sheet name to the DataFrame. | False | 
| header | int | list[int] | Row number(s) to use as the column labels. | 0 | 
| index_col | int | list[int] | None | Column(s) to use as the index of the DataFrame. | None | 
| usecols | int | str | list | Callable | None | Subset of columns to parse. Can be an integer, string, list, or function. | None | 
| dtype | str | None | Data type for the columns. | None | 
| fillna | str | dict[str, list[str]] | dict[str, str] | None | Method or value to use to fill NaN values. | None | 
| true_values | list | None | Values to consider as True. | None | 
| false_values | list | None | Values to consider as False. | None | 
| nrows | int | None | Number of rows to parse. | None | 
| na_values | list[str] | dict[str, list[str]] | None | Additional strings to recognize as NaN/NA. | None | 
| keep_default_na | bool | Whether to append default NaN values when custom  | True | 
| parse_dates | bool | list | dict | Options for parsing date columns. | False | 
| date_parser | Callable | None | Function to use for converting strings to datetime objects. | None | 
| thousands | str | None | Thousands separator to use when parsing numeric columns. | None | 
| include_index | bool | Whether to include an index column in the output DataFrame. | False | 
| options | dict | None | Additional options to pass to the DataFrame reader. | None | 
| add_metadata_column | bool | Whether to add a metadata column with file information to the DataFrame. | True | 
| load_as_strings | bool | Whether to load all columns as strings. | False | 
Raises:
| Type | Description | 
|---|---|
| ValueError | Raised if both  | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The updated context, with the read data as a DataFrame. | 
Source code in src/cloe_nessy/pipeline/actions/read_excel.py
              | 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |  | 
            ReadFilesAction
¶
    
              Bases: PipelineAction
Reads files from a specified location.
If an extension is provided, all files with the given extension will be read
using the FileReader. If no
extension is provided, the spark_format must be set, and all files in the
location will be read using a DataFrameReader with the specified format.
Example
Read Files:
    action: READ_FILES
    options:
        location: json_file_folder/
        search_subdirs: True
        spark_format: JSON
Define Spark Format
Use the spark_format option to specify the format with which
to read the files. Supported formats are e.g., CSV, JSON,
PARQUET, TEXT, and XML.
Read Files:
    action: READ_FILES
    options:
        location: csv_file_folder/
        search_subdirs: True
        extension: csv
Define Extension
Use the extension option to specify the extension of the files
to read. If not specified, the spark_format will be derived from
the extension.
Read Files:
    action: READ_FILES
    options:
        location: file_folder/
        extension: abc_custom_extension  # specifies the files to read
        spark_format: CSV  # specifies the format to read the files with
Define both Extension & Spark Format
Use the extension option to specify the extension of the files
to read. Additionally, use the spark_format option to specify
the format with which to read the files.
Read Delta Files:
    action: READ_FILES
    options:
        location: /path/to/delta/table
        spark_format: delta
    delta_load_options:
        strategy: CDF
        delta_load_identifier: my_delta_files_load
        strategy_options:
            deduplication_columns: ["id"]
            enable_full_load: false
Delta Loading for Files
Use delta_load_options when reading Delta Lake tables to enable
incremental loading. This works with both CDF and timestamp strategies.
Source code in src/cloe_nessy/pipeline/actions/read_files.py
                | 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |  | 
            run(context, *, location=None, search_subdirs=False, extension=None, spark_format=None, schema=None, add_metadata_column=True, options=None, delta_load_options=None, **_)
  
      staticmethod
  
¶
    Reads files from a specified location.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The context in which this Action is executed. | required | 
| location | str | None | The location from which to read files. | None | 
| search_subdirs | bool | Recursively search subdirectories for files if an extension is provided. | False | 
| extension | str | None | The file extension to filter files by. | None | 
| spark_format | str | None | The format to use for reading the files. If not provided, it will be deferred from the file extension. | None | 
| schema | str | None | The schema of the data. If None, schema is obtained from the context metadata. | None | 
| add_metadata_column | bool | Whether to include the  | True | 
| options | dict[str, str] | None | Additional options passed to the reader. | None | 
| delta_load_options | dict[Any, Any] | DeltaLoadOptions | None | Options for delta loading, if applicable. When provided for Delta format files, enables incremental loading using delta loader strategies. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If neither  | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The context after the Action has been executed, containing the read data as a DataFrame. | 
Source code in src/cloe_nessy/pipeline/actions/read_files.py
              | 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |  | 
            ReadMetadataYAMLAction
¶
    
              Bases: PipelineAction
Reads schema metadata from a yaml file using the Schema model.
Example
Source code in src/cloe_nessy/pipeline/actions/read_metadata_yaml.py
                
            run(context, *, path=None, file_name=None, table_name=None, **_)
  
      staticmethod
  
¶
    Reads schema metadata from a yaml file using the Schema model.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The context in which this Action is executed. | required | 
| path | str | None | The path to the data contract directory. | None | 
| file_name | str | None | The name of the file that defines the schema. | None | 
| table_name | str | None | The name of the table for which to retrieve metadata. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If any issues occur while reading the schema, such as an invalid schema, missing file, or missing path. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The context after the execution of this Action, containing the table metadata. | 
Source code in src/cloe_nessy/pipeline/actions/read_metadata_yaml.py
              
            TransformChangeDatatypeAction
¶
    
              Bases: PipelineAction
Changes the datatypes of specified columns in the given DataFrame.
Data Types
We make use of the PySpark cast function to change the data types of
the columns. Valid data types can be found in the PySpark
documentation.
Source code in src/cloe_nessy/pipeline/actions/transform_change_datatype.py
                
            run(context, *, columns=None, **_)
¶
    Changes the datatypes of specified columns in the given DataFrame.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The context in which this Action is executed. | required | 
| columns | dict[str, str] | None | A dictionary where the key is the column name and the value is the desired datatype. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If no columns are provided. | 
| ValueError | If the data from context is None. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The context after the execution of this Action, containing the DataFrame with updated column datatypes. | 
Source code in src/cloe_nessy/pipeline/actions/transform_change_datatype.py
              
            TransformCleanColumnNamesAction
¶
    
              Bases: PipelineAction
Fixes column names in the DataFrame to be valid.
Removes invalid characters from the column names, including the fields of a struct and replaces a single leading underscore by a double underscore.
Invalid characters include
- Any non-word character (anything other than letters, digits, and underscores).
- A single leading underscore.
Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
                
            run(context, **_)
¶
    Fixes column names in the DataFrame to be valid.
Removes invalid characters from the column names, including the fields of a struct and replaces a single leading underscore by a double underscore.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The context in which this Action is executed. | required | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If the data from the context is None. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The context after the execution of this Action, containing the DataFrame with cleaned column names. | 
Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
              
            TransformConcatColumnsAction
¶
    
              Bases: PipelineAction
Concatenates the specified columns in the given DataFrame.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_concat_columns.py
                
            run(context, *, name='', columns=None, separator=None, **_)
¶
    Concatenates the specified columns in the given DataFrame.
Warning
Null Handling Behavior¶
The behavior of null handling differs based on whether a separator is provided:
- When separatoris specified: The function uses Spark'sconcat_ws, which ignoresNULLvalues. In this case,NULLvalues are treated as empty strings ("") and are excluded from the final concatenated result.
- When separatoris not specified: The function defaults to using Spark'sconcat, which returnsNULLif any of the concatenated values isNULL. This means the presence of aNULLin any input will make the entire outputNULL.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The context in which this Action is executed. | required | 
| name | str | The name of the new concatenated column. | '' | 
| columns | list[str] | None | A list of columns to be concatenated. | None | 
| separator | str | None | The separator used between concatenated column values. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If no name is provided. | 
| ValueError | If no columns are provided. | 
| ValueError | If the data from context is None. | 
| ValueError | If 'columns' is not a list. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The context after the execution of this Action, containing the DataFrame with the concatenated column. | 
Source code in src/cloe_nessy/pipeline/actions/transform_concat_columns.py
              
            TransformDecodeAction
¶
    
              Bases: PipelineAction
Decodes values of a specified column in the DataFrame based on the given format.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
                | 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |  | 
            _decode_base64(df, column, base64_schema)
¶
    Decode base64 column.
Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
              
            
            _decode_json(df, column, json_schema)
¶
    Decode json column.
Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
              
            run(context, *, column=None, input_format=None, schema=None, **_)
¶
    Decodes values of a specified column in the DataFrame based on the given format.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The context in which this Action is executed. | required | 
| column | str | None | The name of the column that should be decoded. | None | 
| input_format | str | None | The format from which the column should be decoded. Currently supported formats are 'base64' and 'json'. | None | 
| schema | str | None | For JSON input, the schema of the JSON object. If empty, the schema is inferred from the first row of the DataFrame. For base64 input, the data type to which the column is cast. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If no column is specified. | 
| ValueError | If no input_format is specified. | 
| ValueError | If the data from context is None. | 
| ValueError | If an invalid input_format is provided. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The context after the execution of this Action, containing the DataFrame with the decoded column(s). | 
Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
              
            TransformDistinctAction
¶
    
              Bases: PipelineAction
Selects distinct rows from the DataFrame in the given context.
If a subset is given these columns are used for duplicate comparison. If no subset is given all columns are used.
Source code in src/cloe_nessy/pipeline/actions/transform_distinct.py
                
            run(context, *, subset=None, **_)
¶
    Selects distinct rows from the DataFrame in the given context.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The context in which this Action is executed. | required | 
| subset | list[str] | None | List of column names to use for duplicate comparison (default All columns). | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If the data from the context is None. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The context after the execution of this Action, containing the DataFrame with distinct rows. | 
Source code in src/cloe_nessy/pipeline/actions/transform_distinct.py
              
            TransformFilterAction
¶
    
              Bases: PipelineAction
Filters the DataFrame in the given context based on a specified condition.
Source code in src/cloe_nessy/pipeline/actions/transform_filter.py
                
            run(context, *, condition='', **_)
¶
    Filters the DataFrame in the given context based on a specified condition.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| condition | str | A SQL-like expression used to filter the DataFrame. | '' | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If no condition is provided. | 
| ValueError | If the data from the context is None. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Context after the execution of this Action, containing the filtered DataFrame. | 
Source code in src/cloe_nessy/pipeline/actions/transform_filter.py
              
            TransformGroupAggregate
¶
    
              Bases: PipelineAction
Performs aggregation operations on grouped data within a DataFrame.
This class allows you to group data by specified columns and apply various aggregation functions to other columns. The aggregation functions can be specified as a dictionary where keys are column names and values are either a single aggregation function or a list of functions.
The output DataFrame will contain the grouped columns and the aggregated columns with the aggregation function as a prefix to the column name.
Example
Transform Group Aggregate:
    action: TRANSFORM_GROUP_AGGREGATE
    options:
        grouping_columns:
            - column1
            - column2
        aggregations:
            column3:
                - sum
                - avg
            column4: max
This example groups the DataFrame by column1 and column2 and aggregates column3 by sum and average
and column4 by max. The resulting DataFrame will contain the grouped columns column1 and column2
and the aggregated columns sum_column3, avg_column3, and max_column4.
Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
                
            run(context, *, grouping_columns=None, aggregations=None, **_)
¶
    Executes the aggregation on the grouped data.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The context in which this action is executed. | required | 
| grouping_columns | list[str] | None | A list of columns to group by. | None | 
| aggregations | dict[str, str | list] | None | A dictionary where keys are column names and values are either a single aggregation function or a list of functions. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If the context data is None. | 
| ValueError | If no aggregations are provided. | 
| ValueError | If invalid aggregation operations are provided. | 
| ValueError | If columns with unsupported data types are included in the aggregations. | 
Returns:
| Name | Type | Description | 
|---|---|---|
| PipelineContext | PipelineContext | The context after the execution of this action. | 
Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
              
            TransformHashColumnsAction
¶
    
              Bases: PipelineAction
Hashes specified columns in a DataFrame using a chosen algorithm.
Given the following hash_config:
Example
Given a DataFrame df with the following structure:
| column1 | column2 | column3 | 
|---|---|---|
| foo | bar | baz | 
After running the action, the resulting DataFrame will look like:
| column1 | column2 | column3 | hashed_column1 | hashed_column2 | 
|---|---|---|---|---|
| foo | bar | baz | 17725b837e9c896e7123b142eb980131dcc0baa6160db45d4adfdb21 | 1670361220 | 
Hash values might vary
The actual hash values will depend on the hashing algorithm used and the input data.
Source code in src/cloe_nessy/pipeline/actions/transform_hash_columns.py
                | 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |  | 
            run(context, *, hash_config=None, **_)
¶
    Hashes the specified columns in the DataFrame.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| hash_config | HashConfig | None | Dictionary that contains the configuration for executing the hashing. | None | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Updated PipelineContext with hashed columns. | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If columns are missing, data is None, or algorithm/bits are invalid. | 
| ValueError | If the hash configuration is invalid. | 
Source code in src/cloe_nessy/pipeline/actions/transform_hash_columns.py
              
            TransformJoinAction
¶
    
              Bases: PipelineAction
Joins the current DataFrame with another DataFrame defined in joined_data.
The join operation is performed based on specified columns and the type of
join indicated by the how parameter. Supported join types can be taken
from PySpark
documentation
Example
Join Tables:
    action: TRANSFORM_JOIN
    options:
        joined_data: ((step:Transform First Table))
        join_on: id
        how: anti
Referencing a DataFrame from another step
The joined_data parameter is a reference to the DataFrame from another step.
The DataFrame is accessed using the result attribute of the PipelineStep. The syntax
for referencing the DataFrame is ((step:Step Name)), mind the double parentheses.
Source code in src/cloe_nessy/pipeline/actions/transform_join.py
                
            run(context, *, joined_data=None, join_on=None, how='inner', **_)
¶
    Joins the current DataFrame with another DataFrame defined in joined_data.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| joined_data | PipelineStep | None | The PipelineStep context defining the DataFrame to join with as the right side of the join. | None | 
| join_on | list[str] | str | dict[str, str] | None | A string for the join column name, a list of column names, or a dictionary mapping columns from the left DataFrame to the right DataFrame. This defines the condition for the join operation. | None | 
| how | str | The type of join to perform. Must be one of: inner, cross, outer, full, fullouter, left, leftouter, right, rightouter, semi, anti, etc. | 'inner' | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If no joined_data is provided. | 
| ValueError | If no join_on is provided. | 
| ValueError | If the data from context is None. | 
| ValueError | If the data from the joined_data is None. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Context after the execution of this Action, containing the result of the join operation. | 
Source code in src/cloe_nessy/pipeline/actions/transform_join.py
              
            TransformJsonNormalize
¶
    
              Bases: PipelineAction
Normalizes and flattens the DataFrame by exploding array columns and flattening struct columns.
The method performs recursive normalization on the DataFrame present in the context, ensuring that the order of columns is retained and new columns created by flattening structs are appended after existing columns.
Example
Example Input Data:| id | name | coordinates | attributes | 
|---|---|---|---|
| 1 | Alice | [10.0, 20.0] | {"age": 30, "city": "NY"} | 
| 2 | Bob | [30.0, 40.0] | {"age": 25, "city": "LA"} | 
Example Output Data:
| id | name | coordinates | attributes_age | attributes_city | 
|---|---|---|---|---|
| 1 | Alice | [10.0, 20.0] | 30 | NY | 
| 2 | Bob | [30.0, 40.0] | 25 | LA | 
Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
                | 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |  | 
            _normalize(df, exclude_columns)
  
      staticmethod
  
¶
    Recursively normalizes the given DataFrame by exploding arrays and flattening structs.
This method performs two primary operations: 1. Explodes any array columns, unless they are in the list of excluded columns. 2. Flattens any struct columns, renaming nested fields and appending them to the top-level DataFrame.
The method continues these operations in a loop until there are no array or struct columns left.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| df | The input DataFrame to normalize. | required | |
| exclude_columns | A list of column names to exclude from the normalization process. These columns will not be exploded or flattened. | required | 
Returns:
| Type | Description | 
|---|---|
| pyspark.sql.DataFrame: The normalized DataFrame with no array or struct columns. | 
Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
              
            run(context, *, exclude_columns=None, **_)
¶
    Executes the normalization process on the DataFrame present in the context.
Please note that columns retain their relative order during the normalization process, and new columns created by flattening structs are appended after the existing columns.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | The pipeline context that contains the DataFrame to be normalized. | required | 
| exclude_columns | list[str] | None | A list of column names to exclude from the normalization process. These columns will not be exploded or flattened. | None | 
| **_ | Any | Additional keyword arguments (not used). | {} | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | A new pipeline context with the normalized DataFrame. | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If the DataFrame in the context is  | 
Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
              
            TransformRenameColumnsAction
¶
    
              Bases: PipelineAction
Renames the specified columns in the DataFrame.
This method updates the DataFrame in the provided context by renaming columns according
to the mapping defined in the columns dictionary, where each key represents an old column
name and its corresponding value represents the new column name.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_rename_columns.py
                
            run(context, *, columns=None, **_)
¶
    Renames the specified columns in the DataFrame.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| columns | dict[str, str] | None | A dictionary where the key is the old column name and the value is the new column name. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If no columns are provided. | 
| ValueError | If the data from context is None. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Context after the execution of this Action. | 
Source code in src/cloe_nessy/pipeline/actions/transform_rename_columns.py
              
            TransformReplaceValuesAction
¶
    
              Bases: PipelineAction
Replaces specified values in the given DataFrame.
This method iterates over the specified replace dictionary, where each key is a column name
and each value is another dictionary containing old values as keys and new values as the corresponding
values. The method updates the DataFrame by replacing occurrences of the old values with the new ones
in the specified columns.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_replace_values.py
                
            run(context, *, replace=None, **_)
¶
    Replaces specified values in the given DataFrame.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| replace | dict[str, dict[str, str]] | None | A dictionary where each key is the column name and the corresponding value is another dictionary mapping old values to new values. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If no replace values are provided. | 
| ValueError | If the data from context is None. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Context after the execution of this Action. | 
Source code in src/cloe_nessy/pipeline/actions/transform_replace_values.py
              
            TransformSelectColumnsAction
¶
    
              Bases: PipelineAction
Selects specified columns from the given DataFrame.
This method allows you to include or exclude specific columns from the
DataFrame. If include_columns is provided, only those columns will be
selected. If exclude_columns is provided, all columns except those will be
selected. The method ensures that the specified columns exist in the
DataFrame before performing the selection.
Example
Example Input Data:
| id | name | coordinates | attributes | 
|---|---|---|---|
| 1 | Alice | [10.0, 20.0] | {"age": 30, "city": "NY"} | 
| 2 | Bob | [30.0, 40.0] | {"age": 25, "city": "LA"} | 
Source code in src/cloe_nessy/pipeline/actions/transform_select_columns.py
                | 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |  | 
            run(context, *, include_columns=None, exclude_columns=None, raise_on_non_existing_columns=True, **_)
¶
    Selects specified columns from the given DataFrame.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| include_columns | list[str] | None | A list of column names that should be included. If provided, only these columns will be selected. | None | 
| exclude_columns | list[str] | None | A list of column names that should be excluded. If provided, all columns except these will be selected. | None | 
| raise_on_non_existing_columns | bool | If True, raise an error if a specified column is not found in the DataFrame. If False, ignore the column and continue with the selection. | True | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If a specified column is not found in the DataFrame. | 
| ValueError | If neither include_columns nor exclude_columns are provided, or if both are provided. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Context after the execution of this Action. | 
Source code in src/cloe_nessy/pipeline/actions/transform_select_columns.py
              
            TransformSqlAction
¶
    
              Bases: PipelineAction
Executes a SQL statement on a DataFrame within the provided context.
A temporary view is created from the current DataFrame, and the SQL statement is executed on that view. The resulting DataFrame is returned.
Example
SQL Transform:
    action: TRANSFORM_SQL
    options:
        sql_statement: select city, revenue, firm from {DATA_FRAME} where product="Databricks"
Note
The SQL statement should reference the DataFrame as "{DATA_FRAME}". This nessy specific placeholder will be replaced with your input DataFrame from the context. If your pipeline is defined as an f-string, you can escape the curly braces by doubling them, e.g., "{{DATA_FRAME}}".
Source code in src/cloe_nessy/pipeline/actions/transform_generic_sql.py
                
            run(context, *, sql_statement='', **kwargs)
¶
    Executes a SQL statement on a DataFrame within the provided context.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| sql_statement | str | A string containing the SQL statement to be executed. The source table should be referred to as "{DATA_FRAME}". | '' | 
| **kwargs | Any | Additional keyword arguments are passed as placeholders to the SQL statement. | {} | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If "{DATA_FRAME}" is not included in the SQL statement. | 
| ValueError | If no SQL statement is provided. | 
| ValueError | If the data from the context is None. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Context after the execution of this Action, containing the DataFrame resulting from the SQL statement. | 
Source code in src/cloe_nessy/pipeline/actions/transform_generic_sql.py
              
            TransformUnionAction
¶
    
              Bases: PipelineAction
Unions multiple DataFrames together.
This method takes the current DataFrame from the context and unites it with
additional DataFrames specified in the union_data argument. All DataFrames
must have the same schema. If any DataFrame in union_data is None or
empty, a ValueError will be raised.
Example
Union Tables:
    action: TRANSFORM_UNION
    options:
        union_data:
            - ((step: Filter First Table))
            - ((step: SQL Transform Second Table))
Referencing a DataFrame from another step
The union_data parameter is a reference to the DataFrame from another step.
The DataFrame is accessed using the result attribute of the PipelineStep. The syntax
for referencing the DataFrame is ((step:Step Name)), mind the double parentheses.
Source code in src/cloe_nessy/pipeline/actions/transform_union.py
                
            run(context, *, union_data=None, **_)
¶
    Unions multiple DataFrames together.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| union_data | list[PipelineStep] | None | A list of PipelineSteps that define the DataFrames to union with the current context. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If no union_data is provided. | 
| ValueError | If the data from context is None. | 
| ValueError | If the data from any of the union_data is None. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Context after the execution of this Action. | 
Source code in src/cloe_nessy/pipeline/actions/transform_union.py
              
            WriteCatalogTableAction
¶
    
              Bases: PipelineAction
Writes a DataFrame to a specified catalog table using CatalogWriter.
Example
Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
                
            run(context, *, table_identifier=None, mode='append', partition_by=None, options=None, **_)
  
      staticmethod
  
¶
    Writes a DataFrame to a specified catalog table.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| table_identifier | str | None | The table identifier in the unity catalog in the format 'catalog.schema.table'. If not provided, attempts to use the context's table metadata. | None | 
| mode | str | The write mode. One of 'append', 'overwrite', 'error', 'errorifexists', or 'ignore'. | 'append' | 
| partition_by | str | list[str] | None | Names of the partitioning columns. | None | 
| options | dict[str, str] | None | PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true). | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If the table name is not specified or cannot be inferred from the context. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Context after the execution of this Action. | 
Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
              
            WriteDeltaAppendAction
¶
    
              Bases: PipelineAction
This class implements an Append action for an ETL pipeline.
The WriteDeltaAppendAction appends a Dataframe to Delta Table.
Example
Returns:
| Type | Description | 
|---|---|
| None. | 
Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
                
            run(context, *, table_identifier=None, ignore_empty_df=False, options=None, **_)
¶
    Merge the dataframe into the delta table.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| table_identifier | str | None | The identifier of the table. If passed, the UC Adapter will be used to create a table object. Otherwise the Table object will be created from the table metadata in the context. | None | 
| ignore_empty_df | bool | A flag indicating whether to ignore an empty source dataframe. | False | 
| options | dict[str, Any] | None | Additional options for the append writer. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If the table does not exist. | 
| ValueError | If the data is not set in the pipeline context. | 
| ValueError | If the table metadata is empty. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Pipeline Context | 
Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
              
            WriteDeltaMergeAction
¶
    
              Bases: PipelineAction
This class implements a Merge action for an ETL pipeline.
The MergeIntoDeltaAction merges a Dataframe to Delta Table.
Example
Returns:
| Type | Description | 
|---|---|
| None. | 
Source code in src/cloe_nessy/pipeline/actions/write_delta_merge.py
                | 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |  | 
            run(context, *, table_identifier=None, key_columns=None, cols_to_update=None, cols_to_insert=None, cols_to_exclude=None, when_matched_update=True, when_matched_deleted=False, when_not_matched_insert=True, use_partition_pruning=True, ignore_empty_df=False, create_if_not_exists=True, refresh_table=True, **_)
¶
    Merge the dataframe into the delta table.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| table_identifier | str | None | The identifier of the table. If passed, the UC Adapter will be used to create a table object. Otherwise the Table object will be created from the table metadata in the context. | None | 
| key_columns | list[str] | None | List of column names that form the key for the merge operation. | None | 
| when_matched_update | bool | Flag to specify whether to perform an update operation whenmatching records are found in the target Delta table. | True | 
| when_matched_deleted | bool | Flag to specify whether to perform a delete operation when matching records are found in the target Delta table. | False | 
| when_not_matched_insert | bool | Flag to specify whether to perform an insert operation when matching records are not found in the target Delta table. | True | 
| cols_to_update | list[str] | None | List of column names to be updated in the target Delta table. | None | 
| cols_to_insert | list[str] | None | List of column names to be inserted into the target Delta table. | None | 
| cols_to_exclude | list[str] | None | List of column names to be excluded from the merge operation. | None | 
| use_partition_pruning | bool | Flag to specify whether to use partition pruning to optimize the performance of the merge operation. | True | 
| ignore_empty_df | bool | A flag indicating whether to ignore an empty source dataframe. | False | 
| create_if_not_exists | bool | Create the table if it not exists. | True | 
| refresh_table | bool | Refresh the table after the transaction. | True | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If the table does not exist. | 
| ValueError | If the data is not set in the pipeline context. | 
| ValueError | If the table metadata is empty. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Pipeline Context | 
Source code in src/cloe_nessy/pipeline/actions/write_delta_merge.py
              | 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |  | 
            WriteFileAction
¶
    
              Bases: PipelineAction
This class implements a Write action for an ETL pipeline.
The WriteFileAction writes a Dataframe to a storage location defined in the
options using the FileWriter class.
Example
Source code in src/cloe_nessy/pipeline/actions/write_file.py
                
            run(context, *, path='', format='delta', partition_cols=None, mode='append', is_stream=False, options=None, **_)
¶
    Writes a file to a location.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| context | PipelineContext | Context in which this Action is executed. | required | 
| path | str | Location to write data to. | '' | 
| format | str | Format of files to write. | 'delta' | 
| partition_cols | list[str] | None | Columns to partition on. If None, the writer will try to get the partition columns from the metadata. Default None. | None | 
| mode | str | Specifies the behavior when data or table already exists. | 'append' | 
| is_stream | bool | If True, use the  | False | 
| options | dict[str, str] | None | Additional options passed to the writer. | None | 
Raises:
| Type | Description | 
|---|---|
| ValueError | If no path is provided. | 
| ValueError | If the table metadata is empty. | 
Returns:
| Type | Description | 
|---|---|
| PipelineContext | Pipeline Context |