Bases: PipelineAction
        This class implements an Append action for an ETL pipeline.
The WriteDeltaAppendAction appends a Dataframe to Delta Table.
  Example
  Write Delta Append:
    action: WRITE_DELTA_APPEND
    options:
        table_identifier: my_catalog.my_schema.my_table
        ignore_empty_df: false
 
    Returns:
    
              
                Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
                |  | class WriteDeltaAppendAction(PipelineAction):
    """This class implements an Append action for an ETL pipeline.
    The WriteDeltaAppendAction appends a Dataframe to Delta Table.
    Example:
        ```yaml
        Write Delta Append:
            action: WRITE_DELTA_APPEND
            options:
                table_identifier: my_catalog.my_schema.my_table
                ignore_empty_df: false
        ```
    Returns:
        None.
    """
    name: str = "WRITE_DELTA_APPEND"
    def run(
        self,
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        ignore_empty_df: bool = False,
        options: dict[str, Any] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Merge the dataframe into the delta table.
        Args:
            context: Context in which this Action is executed.
            table_identifier: The identifier of the table. If passed, the
                UC Adapter will be used to create a table object. Otherwise the Table
                object will be created from the table metadata in the context.
            ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
            options: Additional options for the append writer.
        Raises:
            ValueError: If the table does not exist.
            ValueError: If the data is not set in the pipeline context.
            ValueError: If the table metadata is empty.
        Returns:
            Pipeline Context
        """
        delta_append_writer = DeltaAppendWriter()
        if context.data is None:
            raise ValueError("Data is required for the append operation.")
        if context.table_metadata is None and table_identifier is None:
            raise ValueError("Table metadata or a table identifier are required for the append operation.")
        if table_identifier is not None:
            context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
        else:
            if context.table_metadata is None:
                raise ValueError("Table metadata is required.")
        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")
        delta_append_writer.write(
            table_identifier=context.table_metadata.identifier,
            table_location=context.table_metadata.storage_path,
            data_frame=context.data,
            ignore_empty_df=ignore_empty_df,
            options=options,
        )
        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)
        return context.from_existing()
 | 
 
  
            run(context, *, table_identifier=None, ignore_empty_df=False, options=None, **_)
    
        Merge the dataframe into the delta table.
Parameters:
    
      
        
          | Name | Type | Description | Default | 
      
      
          
            | context | PipelineContext | 
                Context in which this Action is executed. | required | 
          
            | table_identifier | str | None | 
                The identifier of the table. If passed, the
UC Adapter will be used to create a table object. Otherwise the Table
object will be created from the table metadata in the context. | None | 
          
            | ignore_empty_df | bool | 
                A flag indicating whether to ignore an empty source dataframe. | False | 
          
            | options | dict[str, Any] | None | 
                Additional options for the append writer. | None | 
      
    
Raises:
    
      
        
          | Type | Description | 
      
      
          
            | ValueError | 
                If the table does not exist. | 
          
            | ValueError | 
                If the data is not set in the pipeline context. | 
          
            | ValueError | 
                If the table metadata is empty. | 
      
    
    Returns:
    
            
              Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
              |  | def run(
    self,
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    ignore_empty_df: bool = False,
    options: dict[str, Any] | None = None,
    **_: Any,
) -> PipelineContext:
    """Merge the dataframe into the delta table.
    Args:
        context: Context in which this Action is executed.
        table_identifier: The identifier of the table. If passed, the
            UC Adapter will be used to create a table object. Otherwise the Table
            object will be created from the table metadata in the context.
        ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
        options: Additional options for the append writer.
    Raises:
        ValueError: If the table does not exist.
        ValueError: If the data is not set in the pipeline context.
        ValueError: If the table metadata is empty.
    Returns:
        Pipeline Context
    """
    delta_append_writer = DeltaAppendWriter()
    if context.data is None:
        raise ValueError("Data is required for the append operation.")
    if context.table_metadata is None and table_identifier is None:
        raise ValueError("Table metadata or a table identifier are required for the append operation.")
    if table_identifier is not None:
        context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
    else:
        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")
    if context.table_metadata is None:
        raise ValueError("Table metadata is required.")
    delta_append_writer.write(
        table_identifier=context.table_metadata.identifier,
        table_location=context.table_metadata.storage_path,
        data_frame=context.data,
        ignore_empty_df=ignore_empty_df,
        options=options,
    )
    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)
    return context.from_existing()
 |