Skip to content

transform_union

TransformUnionAction

Bases: PipelineAction

Unions multiple DataFrames together.

This method takes the current DataFrame from the context and unites it with additional DataFrames specified in the union_data argument. All DataFrames must have the same schema. If any DataFrame in union_data is None or empty, a ValueError will be raised.

Example
Union Tables:
    action: TRANSFORM_UNION
    options:
        union_data:
            - ((step: Filter First Table))
            - ((step: SQL Transform Second Table))

Referencing a DataFrame from another step

The union_data parameter is a reference to the DataFrame from another step. The DataFrame is accessed using the result attribute of the PipelineStep. The syntax for referencing the DataFrame is ((step:Step Name)), mind the double parentheses.

Source code in src/cloe_nessy/pipeline/actions/transform_union.py
class TransformUnionAction(PipelineAction):
    """Unions multiple DataFrames together.

    This method takes the current DataFrame from the context and unites it with
    additional DataFrames specified in the `union_data` argument. All DataFrames
    must have the same schema. If any DataFrame in `union_data` is None or
    empty, a ValueError will be raised.

    Example:
        ```yaml
        Union Tables:
            action: TRANSFORM_UNION
            options:
                union_data:
                    - ((step: Filter First Table))
                    - ((step: SQL Transform Second Table))
        ```
        !!! note "Referencing a DataFrame from another step"
            The `union_data` parameter is a reference to the DataFrame from another step.
            The DataFrame is accessed using the `result` attribute of the PipelineStep. The syntax
            for referencing the DataFrame is `((step:Step Name))`, mind the double parentheses.
    """

    name: str = "TRANSFORM_UNION"

    def run(
        self,
        context: PipelineContext,
        *,
        union_data: list[PipelineStep] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Unions multiple DataFrames together.

        Args:
            context: Context in which this Action is executed.
            union_data: A list of PipelineSteps that define the DataFrames
                to union with the current context.

        Raises:
            ValueError: If no union_data is provided.
            ValueError: If the data from context is None.
            ValueError: If the data from any of the union_data is None.

        Returns:
            Context after the execution of this Action.
        """
        if not union_data:
            raise ValueError("No union_data provided.")

        # Check that all union_data contexts have valid data
        result_contexts = []
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        for ctx in union_data:
            if ctx.result is None or ctx.result.data is None:
                raise ValueError(f"Data from the context of step '{ctx.name}' is required for the operation.")
            result_contexts.append(ctx.result.data)

        # Union all DataFrames
        union_dfs = [context.data] + result_contexts
        df = reduce(DataFrame.unionAll, union_dfs)  # type: ignore

        return context.from_existing(data=df)  # type: ignore

run(context, *, union_data=None, **_)

Unions multiple DataFrames together.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
union_data list[PipelineStep] | None

A list of PipelineSteps that define the DataFrames to union with the current context.

None

Raises:

Type Description
ValueError

If no union_data is provided.

ValueError

If the data from context is None.

ValueError

If the data from any of the union_data is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_union.py
def run(
    self,
    context: PipelineContext,
    *,
    union_data: list[PipelineStep] | None = None,
    **_: Any,
) -> PipelineContext:
    """Unions multiple DataFrames together.

    Args:
        context: Context in which this Action is executed.
        union_data: A list of PipelineSteps that define the DataFrames
            to union with the current context.

    Raises:
        ValueError: If no union_data is provided.
        ValueError: If the data from context is None.
        ValueError: If the data from any of the union_data is None.

    Returns:
        Context after the execution of this Action.
    """
    if not union_data:
        raise ValueError("No union_data provided.")

    # Check that all union_data contexts have valid data
    result_contexts = []
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    for ctx in union_data:
        if ctx.result is None or ctx.result.data is None:
            raise ValueError(f"Data from the context of step '{ctx.name}' is required for the operation.")
        result_contexts.append(ctx.result.data)

    # Union all DataFrames
    union_dfs = [context.data] + result_contexts
    df = reduce(DataFrame.unionAll, union_dfs)  # type: ignore

    return context.from_existing(data=df)  # type: ignore