Skip to content

Index

Pipeline

Bases: LoggerMixin

A Pipeline represents the logical unit of one ETL process.

This class manages a directed acyclic graph (DAG) of steps, ensuring that each step is executed in the correct order based on dependencies.

Attributes:

Name Type Description
name str

The name of the pipeline.

steps OrderedDict[str, PipelineStep]

An ordered dictionary of PipelineSteps that are part of the pipeline.

Source code in src/cloe_nessy/pipeline/pipeline.py
class Pipeline(LoggerMixin):
    """A Pipeline represents the logical unit of one ETL process.

    This class manages a directed acyclic graph (DAG) of steps, ensuring that
    each step is executed in the correct order based on dependencies.

    Attributes:
        name: The name of the pipeline.
        steps: An ordered dictionary of PipelineSteps that are part of the pipeline.
    """

    def __init__(self, name: str, steps: OrderedDict[str, "PipelineStep"] | None = None) -> None:
        self.name: str = name
        self.steps: OrderedDict[str, PipelineStep] = steps if steps is not None else OrderedDict()
        self._console_logger = self.get_console_logger()
        self._graph: nx.DiGraph = self._create_graph()
        self._lock: Lock = Lock()

    @property
    def graph(self) -> nx.DiGraph:
        """Get the pipeline graph."""
        return self._graph

    def _create_graph(self) -> nx.DiGraph:
        """Creates a directed acyclic graph (DAG) representing the pipeline steps and their dependencies.

        Each node in the graph represents a single step in the pipeline, and each edge represents a dependency.
        """
        g: nx.DiGraph = nx.DiGraph()
        g.add_nodes_from(set([s.name for s in self.steps.values()]))
        g.add_edges_from(set([(p, s.name) for s in self.steps.values() for p in s._predecessors if p]))

        self._console_logger.debug(f"Graph created with {g.number_of_nodes()} nodes and {g.number_of_edges()} edges.")
        return g

    def _run_step(self, step_name: str) -> None:
        """Executes the run method of the corresponding step in the pipeline."""
        step = self.steps[step_name]

        # Handle context and metadata references
        if step._context_ref:
            step.context = self.steps[step._context_ref].result
        if step._table_metadata_ref:
            step.context.table_metadata = self.steps[step._table_metadata_ref].result.table_metadata

        try:
            self._console_logger.info(f"Starting execution of step: {step.name}")
            step.run()
        except Exception as err:
            self._console_logger.error(f"Execution of step {step.name} failed with error: {str(err)}")
            raise err
        else:
            self._console_logger.info(f"Execution of step {step.name} succeeded.")

    def _get_ready_to_run_steps(self, remaining_steps: list[str], g: nx.DiGraph) -> set[str]:
        """Identifies and returns the steps that are ready to run.

        This method checks the directed acyclic graph (DAG) to find steps that have no predecessors,
        indicating that they are ready to be executed. It logs the remaining steps and the steps that
        are ready to run.

        Args:
            remaining_steps: A list of step IDs that are yet to be executed.
            g: The directed acyclic graph representing the pipeline.

        Returns:
            A set of step IDs that are ready to be executed.
        """
        with self._lock:
            ready_to_run = set([step for step in remaining_steps if g.in_degree(step) == 0])
            self._console_logger.debug(f"Remaining steps: {remaining_steps}")
            self._console_logger.debug(f"Ready to run: {ready_to_run}")
            return ready_to_run

    def _submit_ready_steps(
        self, ready_to_run: set[str], remaining_steps: list[str], executor: ThreadPoolExecutor, futures: dict
    ):
        """Submits the ready-to-run steps to the executor for execution.

        This method takes the steps that are ready to run, removes them from the list of remaining steps,
        and submits them to the executor for concurrent execution. It also updates the futures dictionary
        to keep track of the submitted tasks.

        Args:
            ready_to_run: A set of steps that are ready to be executed.
            remaining_steps: A list of steps that are yet to be executed.
            executor: The executor that manages the concurrent execution of steps.
            futures: A dictionary mapping futures to their corresponding step ID.
        """
        with self._lock:
            for step in ready_to_run:
                self._console_logger.debug(f"Submitting: {step}")
                remaining_steps.remove(step)
                future = executor.submit(self._run_step, step)
                futures[future] = step

    def _handle_completed_tasks(self, futures, g, remaining_steps):
        """Handles the completion of tasks in the pipeline.

        This method processes the futures that have completed execution. It removes the corresponding
        steps from the directed acyclic graph (DAG) and checks if new steps are ready to run. If new
        steps are ready, it returns True to indicate that the pipeline can continue execution.

        Args:
            futures: A dictionary mapping futures to their corresponding steps.
            g: The directed acyclic graph representing the pipeline.
            remaining_steps: A list of steps that are yet to be executed.

        Returns:
            True if new steps are ready to run, False otherwise.
        """
        # Wait for tasks to complete and free up dependencies
        for future in as_completed(futures):
            future.result()  # checks if the run was successful, otherwise throws an error and cancels remaining futures
            step = futures[future]
            del futures[future]
            with self._lock:
                g.remove_node(step)  # Mark the step as completed by removing it from the graph.
            if len(set([step for step in remaining_steps if g.in_degree(step) == 0])) > 0:
                self._console_logger.debug("New steps ready to run")
                return True
        self._console_logger.debug("No more steps to run")
        return False

    def _trim_graph(self, graph, until):
        """Trims the pipeline graph to only include steps up to the specified 'until' step (excluding).

        This method first verifies that the given step exists in the graph. It then finds all ancestors
        (i.e., steps that precede the 'until' step) and creates a subgraph consisting solely of those steps.

        Args:
            graph: The complete directed acyclic graph representing the pipeline.
            until: The identifier of the step up to which the graph should be trimmed.

        Returns:
            A subgraph containing only the steps leading to (and including) the 'until' step.

        Raises:
            ValueError: If the specified 'until' step is not found in the graph.
        """
        if until not in graph.nodes:
            raise ValueError(f"Step '{until}' not found in the pipeline.")

        predecessors = set(nx.ancestors(graph, until))
        predecessors.add(until)

        trimmed_graph = graph.subgraph(predecessors).copy()
        return trimmed_graph

    def run(self, until: str | None = None) -> None:
        """Executes the pipeline steps in the correct order based on dependencies.

        This method creates a directed acyclic graph (DAG) of the pipeline steps and, if specified, trims
        the graph to only include steps up to the given 'until' step (excluding: the step specified as 'until' will not be executed). It then concurrently executes steps
        with no pending dependencies using a ThreadPoolExecutor, ensuring that all steps are run in order.
        If a cyclic dependency is detected, or if any step fails during execution, the method raises an error.

        Args:
            until: Optional; the identifier of the step up to which the pipeline should be executed.

        Raises:
            RuntimeError: If a cyclic dependency is detected.
            Exception: Propagates any error raised during the execution of a step.
        """
        g = self._create_graph()

        if until is not None:
            g = self._trim_graph(g, until)

        remaining_steps = list(g.nodes())
        self._console_logger.info(f"Pipeline [' {self.name} '] started with {len(remaining_steps)} steps.")

        with ThreadPoolExecutor(max_workers=int(os.environ.get("NESSY_MAX_WORKERS", 1))) as executor:
            futures: dict = {}
            try:
                self._console_logger.debug(f"Remaining steps: {remaining_steps}")
                while remaining_steps:
                    ready_to_run = self._get_ready_to_run_steps(remaining_steps, g)
                    if not ready_to_run:
                        # If there are still steps to be executed, but all of them have predecessors there
                        # must be a cyclic dependency in the graph.
                        self._console_logger.error(
                            f"Cyclic dependency detected in the pipeline. Remaining steps: {remaining_steps}"
                        )
                        raise RuntimeError("Cyclic dependency detected in the pipeline!")

                    self._submit_ready_steps(ready_to_run, remaining_steps, executor, futures)

                    if self._handle_completed_tasks(futures, g, remaining_steps):
                        continue
            except RuntimeError as e:
                self._console_logger.error(f"Pipeline [' {self.name} '] failed due to cyclic dependency: {str(e)}")
                raise e
            except Exception as e:
                self._console_logger.error(f"Pipeline [' {self.name} '] failed: {str(e)}")
                raise e
            finally:
                # ensure that any futures are canceled (if successful, it finished anyway, if error, cancel still running futures)
                for future in futures:
                    future.cancel()  # Cancel remaining futures
                self._graph = self._create_graph()  # recreate the graph after the run
        self._console_logger.info(f"Pipeline [' {self.name} '] completed successfully.")

    def plot_graph(self, save_path: str | None = None) -> None:
        """Generates a visual representation of the pipeline steps and their dependencies.

        This method uses the PipelinePlottingService to create a plot of the pipeline graph. If a save path
        is specified, the plot will be saved to that location; otherwise, it will be displayed interactively.

        Args:
            save_path: Optional; the file path where the plot should be saved. If None, the plot is displayed interactively.
        """
        plotting_service = PipelinePlottingService()
        plotting_service.plot_graph(self, save_path)

graph property

Get the pipeline graph.

_create_graph()

Creates a directed acyclic graph (DAG) representing the pipeline steps and their dependencies.

Each node in the graph represents a single step in the pipeline, and each edge represents a dependency.

Source code in src/cloe_nessy/pipeline/pipeline.py
def _create_graph(self) -> nx.DiGraph:
    """Creates a directed acyclic graph (DAG) representing the pipeline steps and their dependencies.

    Each node in the graph represents a single step in the pipeline, and each edge represents a dependency.
    """
    g: nx.DiGraph = nx.DiGraph()
    g.add_nodes_from(set([s.name for s in self.steps.values()]))
    g.add_edges_from(set([(p, s.name) for s in self.steps.values() for p in s._predecessors if p]))

    self._console_logger.debug(f"Graph created with {g.number_of_nodes()} nodes and {g.number_of_edges()} edges.")
    return g

_get_ready_to_run_steps(remaining_steps, g)

Identifies and returns the steps that are ready to run.

This method checks the directed acyclic graph (DAG) to find steps that have no predecessors, indicating that they are ready to be executed. It logs the remaining steps and the steps that are ready to run.

Parameters:

Name Type Description Default
remaining_steps list[str]

A list of step IDs that are yet to be executed.

required
g DiGraph

The directed acyclic graph representing the pipeline.

required

Returns:

Type Description
set[str]

A set of step IDs that are ready to be executed.

Source code in src/cloe_nessy/pipeline/pipeline.py
def _get_ready_to_run_steps(self, remaining_steps: list[str], g: nx.DiGraph) -> set[str]:
    """Identifies and returns the steps that are ready to run.

    This method checks the directed acyclic graph (DAG) to find steps that have no predecessors,
    indicating that they are ready to be executed. It logs the remaining steps and the steps that
    are ready to run.

    Args:
        remaining_steps: A list of step IDs that are yet to be executed.
        g: The directed acyclic graph representing the pipeline.

    Returns:
        A set of step IDs that are ready to be executed.
    """
    with self._lock:
        ready_to_run = set([step for step in remaining_steps if g.in_degree(step) == 0])
        self._console_logger.debug(f"Remaining steps: {remaining_steps}")
        self._console_logger.debug(f"Ready to run: {ready_to_run}")
        return ready_to_run

_handle_completed_tasks(futures, g, remaining_steps)

Handles the completion of tasks in the pipeline.

This method processes the futures that have completed execution. It removes the corresponding steps from the directed acyclic graph (DAG) and checks if new steps are ready to run. If new steps are ready, it returns True to indicate that the pipeline can continue execution.

Parameters:

Name Type Description Default
futures

A dictionary mapping futures to their corresponding steps.

required
g

The directed acyclic graph representing the pipeline.

required
remaining_steps

A list of steps that are yet to be executed.

required

Returns:

Type Description

True if new steps are ready to run, False otherwise.

Source code in src/cloe_nessy/pipeline/pipeline.py
def _handle_completed_tasks(self, futures, g, remaining_steps):
    """Handles the completion of tasks in the pipeline.

    This method processes the futures that have completed execution. It removes the corresponding
    steps from the directed acyclic graph (DAG) and checks if new steps are ready to run. If new
    steps are ready, it returns True to indicate that the pipeline can continue execution.

    Args:
        futures: A dictionary mapping futures to their corresponding steps.
        g: The directed acyclic graph representing the pipeline.
        remaining_steps: A list of steps that are yet to be executed.

    Returns:
        True if new steps are ready to run, False otherwise.
    """
    # Wait for tasks to complete and free up dependencies
    for future in as_completed(futures):
        future.result()  # checks if the run was successful, otherwise throws an error and cancels remaining futures
        step = futures[future]
        del futures[future]
        with self._lock:
            g.remove_node(step)  # Mark the step as completed by removing it from the graph.
        if len(set([step for step in remaining_steps if g.in_degree(step) == 0])) > 0:
            self._console_logger.debug("New steps ready to run")
            return True
    self._console_logger.debug("No more steps to run")
    return False

_run_step(step_name)

Executes the run method of the corresponding step in the pipeline.

Source code in src/cloe_nessy/pipeline/pipeline.py
def _run_step(self, step_name: str) -> None:
    """Executes the run method of the corresponding step in the pipeline."""
    step = self.steps[step_name]

    # Handle context and metadata references
    if step._context_ref:
        step.context = self.steps[step._context_ref].result
    if step._table_metadata_ref:
        step.context.table_metadata = self.steps[step._table_metadata_ref].result.table_metadata

    try:
        self._console_logger.info(f"Starting execution of step: {step.name}")
        step.run()
    except Exception as err:
        self._console_logger.error(f"Execution of step {step.name} failed with error: {str(err)}")
        raise err
    else:
        self._console_logger.info(f"Execution of step {step.name} succeeded.")

_submit_ready_steps(ready_to_run, remaining_steps, executor, futures)

Submits the ready-to-run steps to the executor for execution.

This method takes the steps that are ready to run, removes them from the list of remaining steps, and submits them to the executor for concurrent execution. It also updates the futures dictionary to keep track of the submitted tasks.

Parameters:

Name Type Description Default
ready_to_run set[str]

A set of steps that are ready to be executed.

required
remaining_steps list[str]

A list of steps that are yet to be executed.

required
executor ThreadPoolExecutor

The executor that manages the concurrent execution of steps.

required
futures dict

A dictionary mapping futures to their corresponding step ID.

required
Source code in src/cloe_nessy/pipeline/pipeline.py
def _submit_ready_steps(
    self, ready_to_run: set[str], remaining_steps: list[str], executor: ThreadPoolExecutor, futures: dict
):
    """Submits the ready-to-run steps to the executor for execution.

    This method takes the steps that are ready to run, removes them from the list of remaining steps,
    and submits them to the executor for concurrent execution. It also updates the futures dictionary
    to keep track of the submitted tasks.

    Args:
        ready_to_run: A set of steps that are ready to be executed.
        remaining_steps: A list of steps that are yet to be executed.
        executor: The executor that manages the concurrent execution of steps.
        futures: A dictionary mapping futures to their corresponding step ID.
    """
    with self._lock:
        for step in ready_to_run:
            self._console_logger.debug(f"Submitting: {step}")
            remaining_steps.remove(step)
            future = executor.submit(self._run_step, step)
            futures[future] = step

_trim_graph(graph, until)

Trims the pipeline graph to only include steps up to the specified 'until' step (excluding).

This method first verifies that the given step exists in the graph. It then finds all ancestors (i.e., steps that precede the 'until' step) and creates a subgraph consisting solely of those steps.

Parameters:

Name Type Description Default
graph

The complete directed acyclic graph representing the pipeline.

required
until

The identifier of the step up to which the graph should be trimmed.

required

Returns:

Type Description

A subgraph containing only the steps leading to (and including) the 'until' step.

Raises:

Type Description
ValueError

If the specified 'until' step is not found in the graph.

Source code in src/cloe_nessy/pipeline/pipeline.py
def _trim_graph(self, graph, until):
    """Trims the pipeline graph to only include steps up to the specified 'until' step (excluding).

    This method first verifies that the given step exists in the graph. It then finds all ancestors
    (i.e., steps that precede the 'until' step) and creates a subgraph consisting solely of those steps.

    Args:
        graph: The complete directed acyclic graph representing the pipeline.
        until: The identifier of the step up to which the graph should be trimmed.

    Returns:
        A subgraph containing only the steps leading to (and including) the 'until' step.

    Raises:
        ValueError: If the specified 'until' step is not found in the graph.
    """
    if until not in graph.nodes:
        raise ValueError(f"Step '{until}' not found in the pipeline.")

    predecessors = set(nx.ancestors(graph, until))
    predecessors.add(until)

    trimmed_graph = graph.subgraph(predecessors).copy()
    return trimmed_graph

plot_graph(save_path=None)

Generates a visual representation of the pipeline steps and their dependencies.

This method uses the PipelinePlottingService to create a plot of the pipeline graph. If a save path is specified, the plot will be saved to that location; otherwise, it will be displayed interactively.

Parameters:

Name Type Description Default
save_path str | None

Optional; the file path where the plot should be saved. If None, the plot is displayed interactively.

None
Source code in src/cloe_nessy/pipeline/pipeline.py
def plot_graph(self, save_path: str | None = None) -> None:
    """Generates a visual representation of the pipeline steps and their dependencies.

    This method uses the PipelinePlottingService to create a plot of the pipeline graph. If a save path
    is specified, the plot will be saved to that location; otherwise, it will be displayed interactively.

    Args:
        save_path: Optional; the file path where the plot should be saved. If None, the plot is displayed interactively.
    """
    plotting_service = PipelinePlottingService()
    plotting_service.plot_graph(self, save_path)

run(until=None)

Executes the pipeline steps in the correct order based on dependencies.

This method creates a directed acyclic graph (DAG) of the pipeline steps and, if specified, trims the graph to only include steps up to the given 'until' step (excluding: the step specified as 'until' will not be executed). It then concurrently executes steps with no pending dependencies using a ThreadPoolExecutor, ensuring that all steps are run in order. If a cyclic dependency is detected, or if any step fails during execution, the method raises an error.

Parameters:

Name Type Description Default
until str | None

Optional; the identifier of the step up to which the pipeline should be executed.

None

Raises:

Type Description
RuntimeError

If a cyclic dependency is detected.

Exception

Propagates any error raised during the execution of a step.

Source code in src/cloe_nessy/pipeline/pipeline.py
def run(self, until: str | None = None) -> None:
    """Executes the pipeline steps in the correct order based on dependencies.

    This method creates a directed acyclic graph (DAG) of the pipeline steps and, if specified, trims
    the graph to only include steps up to the given 'until' step (excluding: the step specified as 'until' will not be executed). It then concurrently executes steps
    with no pending dependencies using a ThreadPoolExecutor, ensuring that all steps are run in order.
    If a cyclic dependency is detected, or if any step fails during execution, the method raises an error.

    Args:
        until: Optional; the identifier of the step up to which the pipeline should be executed.

    Raises:
        RuntimeError: If a cyclic dependency is detected.
        Exception: Propagates any error raised during the execution of a step.
    """
    g = self._create_graph()

    if until is not None:
        g = self._trim_graph(g, until)

    remaining_steps = list(g.nodes())
    self._console_logger.info(f"Pipeline [' {self.name} '] started with {len(remaining_steps)} steps.")

    with ThreadPoolExecutor(max_workers=int(os.environ.get("NESSY_MAX_WORKERS", 1))) as executor:
        futures: dict = {}
        try:
            self._console_logger.debug(f"Remaining steps: {remaining_steps}")
            while remaining_steps:
                ready_to_run = self._get_ready_to_run_steps(remaining_steps, g)
                if not ready_to_run:
                    # If there are still steps to be executed, but all of them have predecessors there
                    # must be a cyclic dependency in the graph.
                    self._console_logger.error(
                        f"Cyclic dependency detected in the pipeline. Remaining steps: {remaining_steps}"
                    )
                    raise RuntimeError("Cyclic dependency detected in the pipeline!")

                self._submit_ready_steps(ready_to_run, remaining_steps, executor, futures)

                if self._handle_completed_tasks(futures, g, remaining_steps):
                    continue
        except RuntimeError as e:
            self._console_logger.error(f"Pipeline [' {self.name} '] failed due to cyclic dependency: {str(e)}")
            raise e
        except Exception as e:
            self._console_logger.error(f"Pipeline [' {self.name} '] failed: {str(e)}")
            raise e
        finally:
            # ensure that any futures are canceled (if successful, it finished anyway, if error, cancel still running futures)
            for future in futures:
                future.cancel()  # Cancel remaining futures
            self._graph = self._create_graph()  # recreate the graph after the run
    self._console_logger.info(f"Pipeline [' {self.name} '] completed successfully.")

PipelineAction

Bases: ABC, LoggerMixin

Models the operation being executed against an Input.

Attributes:

Name Type Description
name str

The name of the action.

Source code in src/cloe_nessy/pipeline/pipeline_action.py
class PipelineAction(ABC, LoggerMixin, metaclass=PipelineActionMeta):
    """Models the operation being executed against an Input.

    Attributes:
        name: The name of the action.
    """

    name: str

    def __init__(self, tabular_logger: logging.Logger | None = None) -> None:
        """Initializes the PipelineAction object.

        Args:
            tabular_logger: The tabular logger to use for dependency injection.
        """
        self._console_logger = self.get_console_logger()
        self._tabular_logger = tabular_logger or self.get_tabular_logger(
            logger_name="Tabular:PipelineAction",
            uc_table_name=PipelineActionLogs().table_name,
            uc_table_columns=PipelineActionLogs().columns,
            log_type=PipelineActionLogs().log_type,
        )

    def __str__(self) -> str:
        return self.__class__.__name__

    @abstractmethod
    def run(self, context: PipelineContext, **kwargs: Any) -> PipelineContext:
        """Execute the pipeline action."""
        pass

__init__(tabular_logger=None)

Initializes the PipelineAction object.

Parameters:

Name Type Description Default
tabular_logger Logger | None

The tabular logger to use for dependency injection.

None
Source code in src/cloe_nessy/pipeline/pipeline_action.py
def __init__(self, tabular_logger: logging.Logger | None = None) -> None:
    """Initializes the PipelineAction object.

    Args:
        tabular_logger: The tabular logger to use for dependency injection.
    """
    self._console_logger = self.get_console_logger()
    self._tabular_logger = tabular_logger or self.get_tabular_logger(
        logger_name="Tabular:PipelineAction",
        uc_table_name=PipelineActionLogs().table_name,
        uc_table_columns=PipelineActionLogs().columns,
        log_type=PipelineActionLogs().log_type,
    )

run(context, **kwargs) abstractmethod

Execute the pipeline action.

Source code in src/cloe_nessy/pipeline/pipeline_action.py
@abstractmethod
def run(self, context: PipelineContext, **kwargs: Any) -> PipelineContext:
    """Execute the pipeline action."""
    pass

PipelineContext

A class that models the context of a pipeline.

The context consists of Table Metadata (the Table definition) and the actual data as a DataFrame.

Attributes:

Name Type Description
table_metadata

The Nessy-Table definition.

data

The data of the context.

runtime_info

Additional runtime information, e.g. streaming status.

status

The status of the context. Can be "initialized", "successful" or "failed".

Note

This is not a pydantic class, because Fabric does not support the type ConnectDataFrame.

Source code in src/cloe_nessy/pipeline/pipeline_context.py
class PipelineContext:
    """A class that models the context of a pipeline.

    The context consists of Table Metadata (the Table definition) and the actual data
    as a DataFrame.

    Attributes:
        table_metadata: The Nessy-Table definition.
        data: The data of the context.
        runtime_info: Additional runtime information, e.g. streaming status.
        status: The status of the context. Can be "initialized", "successful" or
            "failed".

    Note:
        This is not a pydantic class, because Fabric does not support the type ConnectDataFrame.
    """

    def __init__(
        self,
        table_metadata: Table | None = None,
        data: DataFrame | None = None,
        runtime_info: dict[str, Any] | None = None,
        status: str = "initialized",
    ) -> None:
        self.table_metadata = table_metadata
        self.data = data
        self.runtime_info = runtime_info if runtime_info is not None else {}
        self.status = status

    def from_existing(
        self,
        table_metadata: Table | None = None,
        data: DataFrame | None = None,
        runtime_info: dict[str, Any] | None = None,
    ) -> "PipelineContext":
        """Creates a new PipelineContext from an existing one.

        Args:
            table_metadata: The metadata of the new context.
            data: The data of the new context.
            runtime_info: The runtime_info of the new context.

        Returns:
            The new PipelineContext.
        """
        final_metadata = table_metadata or self.table_metadata
        final_data = data or self.data
        final_runtime_info = runtime_info or self.runtime_info or {}
        return PipelineContext(table_metadata=final_metadata, data=final_data, runtime_info=final_runtime_info)

from_existing(table_metadata=None, data=None, runtime_info=None)

Creates a new PipelineContext from an existing one.

Parameters:

Name Type Description Default
table_metadata Table | None

The metadata of the new context.

None
data DataFrame | None

The data of the new context.

None
runtime_info dict[str, Any] | None

The runtime_info of the new context.

None

Returns:

Type Description
PipelineContext

The new PipelineContext.

Source code in src/cloe_nessy/pipeline/pipeline_context.py
def from_existing(
    self,
    table_metadata: Table | None = None,
    data: DataFrame | None = None,
    runtime_info: dict[str, Any] | None = None,
) -> "PipelineContext":
    """Creates a new PipelineContext from an existing one.

    Args:
        table_metadata: The metadata of the new context.
        data: The data of the new context.
        runtime_info: The runtime_info of the new context.

    Returns:
        The new PipelineContext.
    """
    final_metadata = table_metadata or self.table_metadata
    final_data = data or self.data
    final_runtime_info = runtime_info or self.runtime_info or {}
    return PipelineContext(table_metadata=final_metadata, data=final_data, runtime_info=final_runtime_info)

PipelineParsingService

A service class that parses a YAML document or string into a Pipeline object.

Source code in src/cloe_nessy/pipeline/pipeline_parsing_service.py
class PipelineParsingService:
    """A service class that parses a YAML document or string into a Pipeline object."""

    def __init__(self, custom_actions=None):
        if custom_actions is not None:
            for action in custom_actions:
                self.register_pipeline_action(action)

    @staticmethod
    def register_pipeline_action(pipeline_action_class):
        """Registers a custom pipeline action class.

        !!! note
            Registering an action enables the custom action to be used in the
            pipeline YAML definition. This is automatically called, when the
            PipelineParsingService is instantiated with (a list of) custom
            actions.
        """
        console_logger = LoggerMixin().get_console_logger()
        console_logger.info("Registering custom pipeline action [' %s ']", pipeline_action_class.name)
        pipeline_actions[pipeline_action_class.name] = pipeline_action_class

        global PipelineActionType
        PipelineActionType = Enum("PipelineActionType", pipeline_actions)

    @staticmethod
    def parse(path: Path | None = None, yaml_str: str | None = None) -> Pipeline:
        """Reads the YAML from a given Path and returns a Pipeline object.

        Args:
            path: Path to the YAML document.
            yaml_str: A string that can be parsed in YAML format.

        Raises:
            ValueError: If neither 'path' nor 'yaml_str' has been provided.

        Returns:
            Pipeline: The resulting Pipeline instance.
        """
        console_logger = LoggerMixin().get_console_logger()
        if not path and not yaml_str:
            raise ValueError("Neither 'file_path' nor 'yaml_str' was provided. Please supply one of them.")
        if path:
            path_obj = Path(path)
            with open(path_obj) as f:
                yaml_str = f.read()
        if not yaml_str:
            raise ValueError("YAML content is empty.")

        final_yaml_str = PipelineParsingService._replace_variables(yaml_str)
        config = yaml.safe_load(final_yaml_str)
        pipeline_config = PipelineConfig.metadata_to_instance(config)
        steps = PipelineParsingService._get_steps(pipeline_config.steps)
        pipeline = Pipeline(name=pipeline_config.name, steps=steps)  # type: ignore
        console_logger.info("Pipeline [ '%s' ] parsed successfully with %d steps.", pipeline.name, len(pipeline.steps))
        return pipeline

    @staticmethod
    def _replace_variables(yaml_str: str) -> str:
        """Replace variable placeholders in a YAML string.

        Replaces environment variables with the pattern `{{env:var-name}}`. Where
        the var-name is the name of the environment variable. Replaces secret
        references with the pattern `{{secret-scope-name:secret-key}}`. Where
        scope-name is the name of the secret scope and secret-key is the key of
        the secret.

        Args:
            yaml_str: A string that can be parsed in YAML format.

        Returns:
            The same YAML string with environment variable placeholders replaced.
        """
        env_var_pattern = r"\{\{env:([^}]+)\}\}"
        secret_ref_pattern = r"\{\{(?!step|env)([^}]+):([^}]+)\}\}"

        def replace_with_env_var(match):
            env_var_name = match.group(1)
            env_var_value = os.getenv(env_var_name)
            return env_var_value

        def replace_with_secret(match):
            secret_scope_name = match.group(1)
            secret_key = match.group(2)
            return SessionManager.get_utils().secrets.get(scope=secret_scope_name, key=secret_key)

        env_replaced_yaml_string = re.sub(env_var_pattern, replace_with_env_var, yaml_str)
        final_yaml_string = re.sub(secret_ref_pattern, replace_with_secret, env_replaced_yaml_string)
        return final_yaml_string

    @staticmethod
    def _get_steps(step_configs, last_step_name: str | None = None):
        steps = OrderedDict()
        for step_name, step_config in step_configs.items():
            is_successor = step_config.is_successor
            context_ref = step_config.context
            if is_successor and not context_ref:
                context_ref = last_step_name
            action = PipelineActionType[step_config.action.name].value()
            step = PipelineStep(
                name=step_name,
                action=action,
                options=step_config.options,
                _context_ref=context_ref,
                _table_metadata_ref=step_config.table_metadata,
            )
            steps[step.name] = step
            last_step_name = step_name
        for step in steps.values():
            steps[step.name] = PipelineParsingService._replace_step_refs(steps, step)
        return steps

    @staticmethod
    def _replace_step_refs(steps: OrderedDict[str, PipelineStep], step: PipelineStep) -> PipelineStep:
        step_ref_pattern = r"\(\(step:([^)]+)\)\)"

        def _handle_string_value(value: str, option: str):
            if match := re.match(step_ref_pattern, value):
                dependency_step_name = match.group(1)
                dependency_step = steps.get(dependency_step_name)
                step.options[option] = dependency_step
                step._predecessors.add(dependency_step_name)

        def _handle_list_value(value: list, option: str):
            for i, v in enumerate(value):
                if isinstance(v, str):
                    if match := re.match(step_ref_pattern, v):
                        dependency_step_name = match.group(1)
                        dependency_step = steps.get(dependency_step_name)
                        step.options[option][i] = dependency_step
                        step._predecessors.add(dependency_step_name)

        if step.options:
            for option, value in step.options.items():
                if isinstance(value, str):
                    _handle_string_value(value, option)
                elif isinstance(value, list):
                    _handle_list_value(value, option)

        return step

_replace_variables(yaml_str) staticmethod

Replace variable placeholders in a YAML string.

Replaces environment variables with the pattern {{env:var-name}}. Where the var-name is the name of the environment variable. Replaces secret references with the pattern {{secret-scope-name:secret-key}}. Where scope-name is the name of the secret scope and secret-key is the key of the secret.

Parameters:

Name Type Description Default
yaml_str str

A string that can be parsed in YAML format.

required

Returns:

Type Description
str

The same YAML string with environment variable placeholders replaced.

Source code in src/cloe_nessy/pipeline/pipeline_parsing_service.py
@staticmethod
def _replace_variables(yaml_str: str) -> str:
    """Replace variable placeholders in a YAML string.

    Replaces environment variables with the pattern `{{env:var-name}}`. Where
    the var-name is the name of the environment variable. Replaces secret
    references with the pattern `{{secret-scope-name:secret-key}}`. Where
    scope-name is the name of the secret scope and secret-key is the key of
    the secret.

    Args:
        yaml_str: A string that can be parsed in YAML format.

    Returns:
        The same YAML string with environment variable placeholders replaced.
    """
    env_var_pattern = r"\{\{env:([^}]+)\}\}"
    secret_ref_pattern = r"\{\{(?!step|env)([^}]+):([^}]+)\}\}"

    def replace_with_env_var(match):
        env_var_name = match.group(1)
        env_var_value = os.getenv(env_var_name)
        return env_var_value

    def replace_with_secret(match):
        secret_scope_name = match.group(1)
        secret_key = match.group(2)
        return SessionManager.get_utils().secrets.get(scope=secret_scope_name, key=secret_key)

    env_replaced_yaml_string = re.sub(env_var_pattern, replace_with_env_var, yaml_str)
    final_yaml_string = re.sub(secret_ref_pattern, replace_with_secret, env_replaced_yaml_string)
    return final_yaml_string

parse(path=None, yaml_str=None) staticmethod

Reads the YAML from a given Path and returns a Pipeline object.

Parameters:

Name Type Description Default
path Path | None

Path to the YAML document.

None
yaml_str str | None

A string that can be parsed in YAML format.

None

Raises:

Type Description
ValueError

If neither 'path' nor 'yaml_str' has been provided.

Returns:

Name Type Description
Pipeline Pipeline

The resulting Pipeline instance.

Source code in src/cloe_nessy/pipeline/pipeline_parsing_service.py
@staticmethod
def parse(path: Path | None = None, yaml_str: str | None = None) -> Pipeline:
    """Reads the YAML from a given Path and returns a Pipeline object.

    Args:
        path: Path to the YAML document.
        yaml_str: A string that can be parsed in YAML format.

    Raises:
        ValueError: If neither 'path' nor 'yaml_str' has been provided.

    Returns:
        Pipeline: The resulting Pipeline instance.
    """
    console_logger = LoggerMixin().get_console_logger()
    if not path and not yaml_str:
        raise ValueError("Neither 'file_path' nor 'yaml_str' was provided. Please supply one of them.")
    if path:
        path_obj = Path(path)
        with open(path_obj) as f:
            yaml_str = f.read()
    if not yaml_str:
        raise ValueError("YAML content is empty.")

    final_yaml_str = PipelineParsingService._replace_variables(yaml_str)
    config = yaml.safe_load(final_yaml_str)
    pipeline_config = PipelineConfig.metadata_to_instance(config)
    steps = PipelineParsingService._get_steps(pipeline_config.steps)
    pipeline = Pipeline(name=pipeline_config.name, steps=steps)  # type: ignore
    console_logger.info("Pipeline [ '%s' ] parsed successfully with %d steps.", pipeline.name, len(pipeline.steps))
    return pipeline

register_pipeline_action(pipeline_action_class) staticmethod

Registers a custom pipeline action class.

Note

Registering an action enables the custom action to be used in the pipeline YAML definition. This is automatically called, when the PipelineParsingService is instantiated with (a list of) custom actions.

Source code in src/cloe_nessy/pipeline/pipeline_parsing_service.py
@staticmethod
def register_pipeline_action(pipeline_action_class):
    """Registers a custom pipeline action class.

    !!! note
        Registering an action enables the custom action to be used in the
        pipeline YAML definition. This is automatically called, when the
        PipelineParsingService is instantiated with (a list of) custom
        actions.
    """
    console_logger = LoggerMixin().get_console_logger()
    console_logger.info("Registering custom pipeline action [' %s ']", pipeline_action_class.name)
    pipeline_actions[pipeline_action_class.name] = pipeline_action_class

    global PipelineActionType
    PipelineActionType = Enum("PipelineActionType", pipeline_actions)

PipelineStep dataclass

A PipelineStep is a logical step within a Pipeline.

The step stores the PipelineContext and offers an interface to interact with the Steps DataFrame.

Attributes:

Name Type Description
name str

The name of the step.

action PipelineAction

The action to be executed.

is_successor PipelineAction

A boolean indicating if the step is a successor and takes the previous steps context.

context PipelineContext

The context of the step.

options dict[str, Any]

Additional options for the step

_predecessors set[str]

A list of names of the steps that are predecessors to this step.

_context_ref str | None

Reference to the previous steps context

_table_metadata_ref str | None

Reference to the previous steps metadata

Source code in src/cloe_nessy/pipeline/pipeline_step.py
@dataclass
class PipelineStep:
    """A PipelineStep is a logical step within a Pipeline.

    The step stores the PipelineContext and offers an interface to interact with
    the Steps DataFrame.

    Attributes:
        name: The name of the step.
        action: The action to be executed.
        is_successor: A boolean indicating if the step is a successor and takes
            the previous steps context.
        context: The context of the step.
        options: Additional options for the step
        _predecessors: A list of names of the steps that are predecessors to this step.
        _context_ref: Reference to the previous steps context
        _table_metadata_ref: Reference to the previous steps metadata
    """

    name: str
    action: PipelineAction
    context: PipelineContext = field(default_factory=lambda: PipelineContext())
    options: dict[str, Any] = field(default_factory=lambda: {})
    result: PipelineContext = field(default_factory=lambda: PipelineContext())
    _predecessors: set[str] = field(default_factory=lambda: set())
    _context_ref: str | None = None
    _table_metadata_ref: str | None = None

    def __post_init__(self) -> None:
        if not isinstance(self.action, PipelineAction):
            raise ValueError("action must be a PipelineAction subclass.")
        if self._context_ref:
            self._predecessors.add(self._context_ref)
        if self._table_metadata_ref:
            self._predecessors.add(self._table_metadata_ref)
        if self.options:
            for val in self.options.values():
                if isinstance(val, PipelineStep):
                    self._predecessors.add(val.name)

    def run(self) -> None:
        """Execute the action on the context."""
        self.result = self.action.run(context=self.context, **self.options)

run()

Execute the action on the context.

Source code in src/cloe_nessy/pipeline/pipeline_step.py
def run(self) -> None:
    """Execute the action on the context."""
    self.result = self.action.run(context=self.context, **self.options)