Bases: PipelineAction
        Fixes column names in the DataFrame to be valid.
Removes invalid characters from the column names, including the fields of a struct and
replaces a single leading underscore by a double underscore.
  Invalid characters include
  
- Any non-word character (anything other than letters, digits, and underscores).
- A single leading underscore.
 
  Example
  Clean Column Names:
    action: TRANSFORM_CLEAN_COLUMN_NAMES
 
              
                Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
                |  | class TransformCleanColumnNamesAction(PipelineAction):
    """Fixes column names in the DataFrame to be valid.
    Removes invalid characters from the column names, including the fields of a struct and
    replaces a single leading underscore by a double underscore.
    Invalid characters include:
        - Any non-word character (anything other than letters, digits, and underscores).
        - A single leading underscore.
    Example:
        ```yaml
        Clean Column Names:
            action: TRANSFORM_CLEAN_COLUMN_NAMES
        ```
    """
    name: str = "TRANSFORM_CLEAN_COLUMN_NAMES"
    def run(
        self,
        context: PipelineContext,
        **_: Any,
    ) -> PipelineContext:
        """Fixes column names in the DataFrame to be valid.
        Removes invalid characters from the column names, including the fields of a struct and
        replaces a single leading underscore by a double underscore.
        Args:
            context: The context in which this Action is executed.
        Raises:
            ValueError: If the data from the context is None.
        Returns:
            The context after the execution of this Action, containing the DataFrame with cleaned column names.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")
        with_columns_renamed = {}
        with_columns_casted: dict[str, T.StructType | T.ArrayType | T.MapType] = {}
        single_underscrore_at_beginning = r"^_(?=[^_])"
        for c in context.data.schema:
            old_name = c.name
            new_name = re.sub(single_underscrore_at_beginning, "__", re.sub(r"\W", "_", old_name))
            with_columns_renamed[old_name] = new_name
            if isinstance(c.dataType, (T.StructType | T.ArrayType | T.MapType)):
                old_column_schema = c.dataType.json()
                new_column_schema = re.sub(
                    r'(?<="name":")[^"]+',
                    lambda m: re.sub(r"\W", "_", str(m.group())),
                    old_column_schema,
                )
                if isinstance(c.dataType, T.StructType):
                    with_columns_casted[new_name] = T.StructType.fromJson(json.loads(new_column_schema))
                elif isinstance(c.dataType, T.ArrayType):
                    with_columns_casted[new_name] = T.ArrayType.fromJson(json.loads(new_column_schema))
                elif isinstance(c.dataType, T.MapType):
                    with_columns_casted[new_name] = T.MapType.fromJson(json.loads(new_column_schema))
        df = context.data.withColumnsRenamed(with_columns_renamed)
        for c_name, c_type in with_columns_casted.items():
            df = df.withColumn(c_name, F.col(c_name).cast(c_type))
        return context.from_existing(data=df)  # type: ignore
 | 
 
  
    
        Fixes column names in the DataFrame to be valid.
Removes invalid characters from the column names, including the fields of a struct and
replaces a single leading underscore by a double underscore.
Parameters:
    
      
        
          | Name | Type | Description | Default | 
      
      
          
            | context | PipelineContext | 
                The context in which this Action is executed. | required | 
      
    
Raises:
    
      
        
          | Type | Description | 
      
      
          
            | ValueError | 
                If the data from the context is None. | 
      
    
    Returns:
    
      
        
          | Type | Description | 
      
      
          
            | PipelineContext | 
                The context after the execution of this Action, containing the DataFrame with cleaned column names. | 
      
    
            
              Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
              |  | def run(
    self,
    context: PipelineContext,
    **_: Any,
) -> PipelineContext:
    """Fixes column names in the DataFrame to be valid.
    Removes invalid characters from the column names, including the fields of a struct and
    replaces a single leading underscore by a double underscore.
    Args:
        context: The context in which this Action is executed.
    Raises:
        ValueError: If the data from the context is None.
    Returns:
        The context after the execution of this Action, containing the DataFrame with cleaned column names.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")
    with_columns_renamed = {}
    with_columns_casted: dict[str, T.StructType | T.ArrayType | T.MapType] = {}
    single_underscrore_at_beginning = r"^_(?=[^_])"
    for c in context.data.schema:
        old_name = c.name
        new_name = re.sub(single_underscrore_at_beginning, "__", re.sub(r"\W", "_", old_name))
        with_columns_renamed[old_name] = new_name
        if isinstance(c.dataType, (T.StructType | T.ArrayType | T.MapType)):
            old_column_schema = c.dataType.json()
            new_column_schema = re.sub(
                r'(?<="name":")[^"]+',
                lambda m: re.sub(r"\W", "_", str(m.group())),
                old_column_schema,
            )
            if isinstance(c.dataType, T.StructType):
                with_columns_casted[new_name] = T.StructType.fromJson(json.loads(new_column_schema))
            elif isinstance(c.dataType, T.ArrayType):
                with_columns_casted[new_name] = T.ArrayType.fromJson(json.loads(new_column_schema))
            elif isinstance(c.dataType, T.MapType):
                with_columns_casted[new_name] = T.MapType.fromJson(json.loads(new_column_schema))
    df = context.data.withColumnsRenamed(with_columns_renamed)
    for c_name, c_type in with_columns_casted.items():
        df = df.withColumn(c_name, F.col(c_name).cast(c_type))
    return context.from_existing(data=df)  # type: ignore
 |