Skip to content

Base Metadata objects

The base metadata objects are the first layer of metadata. They are loaded and validated first before any code is run. They only check if the most basic requirements are fulfilled like if the datatype is correct and some basic tests like if the name is in accordance with standards.

Please note some metadata objects have advanced functionality. This can be advanced validation which also checks for consistency and best practices. In many cases that will also include additional functionality like templating logic for some fields. Please check the shared for theses objects.

Object list

Batch

Bases: WithSubfoldersMixin

Base class for loading Batch model objects.

Parameters:

Name Type Description Default
id UUID

The batch ID so that it can referenced.

UUID('48ee597b-623d-44a3-a6d3-aa7b5a748469')
name str

Name of the Batch(must be unique)

required
cron str

Cron schedule for batch scheduling.

required
batchsteps list[Batchstep]

A list of all batchsteps belonging to that batch.

required
timezone str

Timezone for cron

required
tags str | None

Tags of the Batch

None
Source code in cloe_metadata/base/batch/batch.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class Batch(WithSubfoldersMixin):
    """Base class for loading Batch model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("batchsteps")
    exclude_when_writing_to_disk: ClassVar[set] = {"batchsteps"}
    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID = Field(
        title="Batch ID",
        default_factory=uuid.uuid4,
        description="The batch ID so that it can referenced.",
    )
    name: str = Field(..., description="Name of the Batch(must be unique)")
    cron: str = Field(..., description="Cron schedule for batch scheduling.")
    batchsteps: list[Batchstep] = Field(
        ..., description="A list of all batchsteps belonging to that batch."
    )
    timezone: str = Field(..., description="Timezone for cron")
    tags: str | None = Field(default=None, description="Tags of the Batch")
    _batchsteps_cache: dict[uuid.UUID, Batchstep] = {}

    _check_name_w_replace = field_validator("name")(
        validators.name_alphanumeric_w_replace
    )

    @field_validator("cron")
    @classmethod
    def cron_valid_check(cls, value):
        if not croniter.is_valid(value):
            raise ValueError("is not a valid cron")
        return value

    @field_validator("batchsteps")
    @classmethod
    def child_uniqueness_check(cls, value):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def json_object_to_class(
        cls, data: dict, instance_dir: pathlib.Path
    ) -> tuple[Batch | None, list[ValidationError | json.JSONDecodeError]]:
        instance_folderpath = instance_dir / cls.subfolder_path
        batchsteps, sub_errors = Batchstep.read_instances_from_disk(instance_folderpath)
        try:
            instance = cls(**data, batchsteps=batchsteps)
        except ValidationError as e:
            instance = None
            sub_errors.append(e)
        return instance, sub_errors

    def _write_childs_to_disk(self, sub_output_path: pathlib.Path) -> None:
        for child in self.batchsteps:
            child.write_to_disk(sub_output_path / self.subfolder_path)

    def get_batchsteps(self) -> dict[uuid.UUID, Batchstep]:
        if len(self._batchsteps_cache) < 1:
            self._batchsteps_cache = {jobs.id: jobs for jobs in self.batchsteps}
        return self._batchsteps_cache

Batches

Bases: BaseModel

Base class for loading Batch model objects.

Parameters:

Name Type Description Default
batches list[Batch]
[]
Source code in cloe_metadata/base/batch/batch.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class Batches(BaseModel):
    """Base class for loading Batch model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("orchestration", "batches")

    batches: list[Batch] = []

    @field_validator("batches")
    @classmethod
    def child_uniqueness_check(cls, value: list[Batch]):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[Batches, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")
        instances, errors = Batch.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(batches=instances)
        except ValidationError as e:
            errors.append(e)
        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.batches:
            child.write_to_disk(output_path / self.subfolder_path)

Batchstep

Bases: WithoutSubfoldersMixin

Base class for loading Batchstep model objects.

Parameters:

Name Type Description Default
id UUID

UUID of the Batchstep

required
name str

Name of the Batchstep(must be unique)

required
job_id UUID

Reference to a job

required
tags str | None

Tags of the Batchstep

None
dependencies list[Annotated[BatchstepDependency, AfterValidator]] | None

All dependencies belonging to that job.

None
Source code in cloe_metadata/base/batch/batchstep.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class Batchstep(WithoutSubfoldersMixin):
    """Base class for loading Batchstep model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID = Field(..., description="UUID of the Batchstep")
    name: str = Field(..., description="Name of the Batchstep(must be unique)")
    job_id: uuid.UUID = Field(..., description="Reference to a job")
    tags: str | None = Field(default=None, description="Tags of the Batchstep")
    dependencies: (
        list[
            Annotated[BatchstepDependency, AfterValidator(dependencies_self_dependency)]
        ]
        | None
    ) = Field(default=None, description="All dependencies belonging to that job.")

    _check_name_w_replace = field_validator("name")(
        validators.name_alphanumeric_w_replace
    )

    @classmethod
    def json_object_to_class(
        cls, data: dict
    ) -> tuple[Batchstep | None, list[ValidationError]]:
        errors = []
        dependencies = []
        for dependency in data.pop("dependencies", []):
            try:
                dependencies.append(BatchstepDependency(**dependency))
            except ValidationError as e:
                errors.append(e)
        try:
            instance = cls(**data, dependencies=dependencies)
        except ValidationError as e:
            instance = None
            errors.append(e)
        return instance, errors

    def get_dependencies(self) -> list[uuid.UUID]:
        if self.dependencies is None:
            return []
        else:
            return [i.dependent_on_batchstep_id for i in self.dependencies]

Connection

Bases: WithoutSubfoldersMixin

Base class for loading Connection model objects.

Parameters:

Name Type Description Default
connection_string_secret_name str

Name of the secret that holds the connection information(e.g. password or host)

required
id UUID

UUID of the connection

required
name str

Name of the connection(must be unique)

required
system_type str

Connection type

required
service_level str | None

service level of the connection

None
is_file_catalog_connection bool

Can the FileCatalog reached via this connection?

False
Source code in cloe_metadata/base/jobs/connections.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class Connection(WithoutSubfoldersMixin):
    """Base class for loading Connection model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    azure_synapse_key: ClassVar[str] = "Azure Synapse Analytics"
    sql_server_nativ_key: ClassVar[str] = "SQL Server >=2016"
    azure_server_nativ_key: ClassVar[str] = "Azure SQL Server"
    snowflake_nativ_key: ClassVar[str] = "Snowflake"
    azure_blob_key: ClassVar[str] = "AzureBlob"
    oracle_key: ClassVar[str] = "Oracle"
    db2_key: ClassVar[str] = "DB2"
    postgresql_key: ClassVar[str] = "PostgreSQL"
    azurepostgresql_key: ClassVar[str] = "AzurePostgreSQL"

    connection_string_secret_name: str = Field(
        ...,
        description="Name of the secret that holds the connection information(e.g. password or host)",
    )
    id: uuid.UUID = Field(..., description="UUID of the connection")
    name: str = Field(..., description="Name of the connection(must be unique)")
    system_type: str = Field(..., description="Connection type")
    service_level: str | None = Field(
        default=None, description="service level of the connection"
    )
    is_file_catalog_connection: bool = Field(
        default=False, description="Can the FileCatalog reached via this connection?"
    )

    _check_name_w_replace = field_validator("name")(
        validators.name_alphanumeric_w_replace
    )

    @field_validator("system_type")
    @classmethod
    def available_system_type(cls, value):
        available_systems = [
            cls.azure_synapse_key,
            cls.sql_server_nativ_key,
            cls.azure_server_nativ_key,
            cls.snowflake_nativ_key,
            cls.azure_blob_key,
            cls.oracle_key,
            cls.db2_key,
            cls.postgresql_key,
            cls.azurepostgresql_key,
        ]
        if value not in available_systems:
            raise ValueError("is unknown system type")
        return value

    @property
    def is_snowflake_nativ(self) -> bool:
        return self.system_type == self.snowflake_nativ_key

    @property
    def is_azure_sql_nativ(self) -> bool:
        return self.system_type == self.azure_server_nativ_key

    @property
    def is_sql_server_nativ(self) -> bool:
        return self.system_type == self.sql_server_nativ_key

    @property
    def is_synapse_nativ(self) -> bool:
        return self.system_type == self.azure_synapse_key

    @property
    def is_db2_nativ(self) -> bool:
        return self.system_type == self.db2_key

    @property
    def is_postgres_sql_nativ(self) -> bool:
        return self.system_type == self.postgresql_key

    @property
    def is_azurepostgres_sql_nativ(self) -> bool:
        return self.system_type == self.azurepostgresql_key

    @property
    def is_oracle_db(self) -> bool:
        return self.system_type == self.oracle_key

    @property
    def is_azure_blob(self) -> bool:
        return self.system_type == self.azure_blob_key

    @property
    def is_tsql(self) -> bool:
        return (
            self.is_synapse_nativ or self.is_azure_sql_nativ or self.is_sql_server_nativ
        )

    def get_short_id(self) -> str:
        return str(self.id).split("-")[0]

Connections

Bases: BaseModel

Base class for loading CLOE Connection model objects.

Parameters:

Name Type Description Default
connections list[Connection]
[]
Source code in cloe_metadata/base/jobs/connections.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class Connections(BaseModel):
    """Base class for loading CLOE Connection model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("jobs", "connections")

    connections: list[Connection] = []

    _connections_cache: dict[uuid.UUID, Connection] = {}

    @field_validator("connections")
    @classmethod
    def child_uniqueness_check(cls, value: list[Connection]):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[Connections, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances, errors = Connection.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(connections=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.connections:
            child.write_to_disk(output_path / self.subfolder_path)

    def get_connections(self) -> dict[uuid.UUID, Connection]:
        if len(self._connections_cache) < 1:
            self._connections_cache = {
                connection.id: connection for connection in self.connections
            }
        return self._connections_cache

ConversionTemplate

Bases: WithoutSubfoldersMixin

ConversionTemplate metadata model base class

Parameters:

Name Type Description Default
id UUID
UUID('35de2a3f-19ad-490e-be17-e2d47bd579ee')
output_type str
required
convert_template str
required
on_convert_error_default_value str
required
Source code in cloe_metadata/base/modeler/templates/conversion_template.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ConversionTemplate(WithoutSubfoldersMixin):
    """ConversionTemplate metadata model base class"""

    attribute_used_for_filename: ClassVar[str] = "output_type"

    id: uuid.UUID = Field(default_factory=uuid.uuid4)
    output_type: str
    convert_template: str
    on_convert_error_default_value: str

    _check_name = field_validator("output_type")(validators.name_alphanumeric)

    _check_convert_template = field_validator("convert_template")(
        validators.check_if_valid_template
    )

ConversionTemplates

Bases: BaseModel

Base class for loading CLOE ConversionTemplate model objects.

Parameters:

Name Type Description Default
conversion_templates list[ConversionTemplate]
[]
Source code in cloe_metadata/base/modeler/templates/conversion_template.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class ConversionTemplates(BaseModel):
    """Base class for loading CLOE ConversionTemplate model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path(
        "modeler", "templates", "conversion_templates"
    )

    conversion_templates: list[ConversionTemplate] = []
    _conversion_template_cache: dict[str, ConversionTemplate] = {}
    model_config = ConfigDict(
        populate_by_name=True, alias_generator=writer.to_lower_camel_case
    )

    @field_validator("conversion_templates")
    @classmethod
    def child_uniqueness_check(cls, value: list[ConversionTemplate]):
        validators.find_non_unique(value, "output_type")
        return value

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[ConversionTemplates, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances, errors = ConversionTemplate.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(conversion_templates=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.conversion_templates:
            child.write_to_disk(output_path / self.subfolder_path)

    def get_templates(self) -> dict[str, ConversionTemplate]:
        if len(self._conversion_template_cache) < 1:
            self._conversion_template_cache = {
                template.output_type: template for template in self.conversion_templates
            }
        return self._conversion_template_cache

CustomDataflow

Bases: WithoutSubfoldersMixin

CustomDataflow metadata model base class

Parameters:

Name Type Description Default
name str

Name of the CustomDataFlow(must be unique)

required
id UUID

ID of the dataflow

UUID('3347c205-37a4-4052-9fb3-30e03c57504f')
sql_pipe_template str

Query template that should be used by CustomDataFlow

required
table_mappings list[TableMapping]

TableMappings belonging to that CustomDataFlow

required
job_id UUID | None

Reference to the sink Exec_SQL job

None
Source code in cloe_metadata/base/modeler/custom_dataflow/custom_dataflow.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class CustomDataflow(WithoutSubfoldersMixin):
    """CustomDataflow metadata model base class"""

    attribute_used_for_filename: ClassVar[str] = "name"

    name: str = Field(..., description="Name of the CustomDataFlow(must be unique)")
    id: uuid.UUID = Field(default_factory=uuid.uuid4, description="ID of the dataflow")
    sql_pipe_template: str = Field(
        ..., description="Query template that should be used by CustomDataFlow"
    )
    table_mappings: list[table_mapping.TableMapping] = Field(
        ..., description="TableMappings belonging to that CustomDataFlow"
    )
    job_id: uuid.UUID | None = Field(
        default=None, description="Reference to the sink Exec_SQL job"
    )

    _check_name_w_replace = field_validator("name")(
        validators.name_alphanumeric_w_replace
    )

    _check_sql_pipe_template = field_validator("sql_pipe_template")(
        validators.check_if_valid_template
    )

    @field_validator("table_mappings")
    @classmethod
    def min_number_table_mappings(cls, value):
        if len(value) < 1:
            raise ValueError("at least one table mapping needs to be set.")
        return value

    @classmethod
    def json_object_to_class(
        cls, data: dict
    ) -> tuple[CustomDataflow | None, list[ValidationError]]:
        errors = []
        table_mappings = []
        for tm in data.pop("tableMappings", []):
            try:
                table_mappings.append(table_mapping.TableMapping(**tm))
            except ValidationError as e:
                errors.append(e)
        try:
            instance = cls(**data, table_mappings=table_mappings)
        except ValidationError as e:
            instance = None
            errors.append(e)
        return instance, errors

DB2FS

Bases: WithoutSubfoldersMixin

Base class for loading DB2FS model objects.

Parameters:

Name Type Description Default
id UUID

ID of the job

required
name str

Name of the job(must be unique)

required
description str | None

Description of the job.

None
sink_connection_id UUID

Reference the sink connection. Will be used to establish a connection to the bucket/blob.

required
source_connection_id UUID

Reference the sink connection. Will be used to establish a connection to the database.

required
container_name str

Name of the bucket/blob where the file is saved to.

required
select_statement str

This query is executed and its result set stored in the file. Jinja2 templating is supported.

required
dataset_type_id UUID

Reference a datasettype which is partly used to generate the filename and to define the target filetype(e.g. Parquet).

required
source_table_id UUID

Reference a table. Table metadata e.g. its Name or table column names can be used in the Source_SelectStatement.

required
datasource_info_id UUID

Reference a datasourceinfo which is partly used to generate the filename.

required
folder_path str | None

Path where the file will be saved to.

None
sequence_column_name str | None

Currently not supported

None
Source code in cloe_metadata/base/jobs/db2fs.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class DB2FS(WithoutSubfoldersMixin):
    """Base class for loading DB2FS model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID = Field(..., description="ID of the job")
    name: str = Field(..., description="Name of the job(must be unique)")
    description: str | None = Field(default=None, description="Description of the job.")
    sink_connection_id: uuid.UUID = Field(
        ...,
        description="Reference the sink connection. Will be used to establish a connection to the bucket/blob.",
    )
    source_connection_id: uuid.UUID = Field(
        ...,
        description="Reference the sink connection. Will be used to establish a connection to the database.",
    )
    container_name: str = Field(
        ..., description="Name of the bucket/blob where the file is saved to."
    )
    select_statement: str = Field(
        ...,
        description="This query is executed and its result set stored in the file. Jinja2 templating is supported.",
    )
    dataset_type_id: uuid.UUID = Field(
        ...,
        description="Reference a datasettype which is partly used to generate the filename and to define the target filetype(e.g. Parquet).",
    )
    source_table_id: uuid.UUID = Field(
        ...,
        description="Reference a table. Table metadata e.g. its Name or table column names can be used in the Source_SelectStatement.",
    )
    datasource_info_id: uuid.UUID = Field(
        ...,
        description="Reference a datasourceinfo which is partly used to generate the filename.",
    )
    folder_path: str | None = Field(
        default=None, description="Path where the file will be saved to."
    )
    sequence_column_name: str | None = Field(
        default=None, description="Currently not supported"
    )

    _check_name_w_replace = field_validator("name")(
        validators.name_alphanumeric_w_replace
    )

    _check_select_statement = field_validator("select_statement")(
        validators.check_if_valid_template
    )

    _check_folder_path = field_validator("folder_path")(
        validators.check_if_valid_template
    )

DataSourceInfo

Bases: WithoutSubfoldersMixin

Base class for loading DataSourceInfo model objects.

Parameters:

Name Type Description Default
id UUID
required
name str | None
None
content Literal[str, str]
required
sourcesystem_id UUID
required
tenant_id UUID | None
None
object_description str | None
None
Source code in cloe_metadata/base/repository/data_source_info.py
21
22
23
24
25
26
27
28
29
30
31
32
33
class DataSourceInfo(WithoutSubfoldersMixin):
    """Base class for loading DataSourceInfo model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID
    name: str | None = None
    content: Literal["full", "delta"]
    sourcesystem_id: uuid.UUID
    tenant_id: uuid.UUID | None = None
    object_description: str | None = None

    _check_name = field_validator("object_description")(validators.name_alphanumeric)

DataSourceInfos

Bases: BaseModel

Base class for loading DataSourceInfos model objects.

Parameters:

Name Type Description Default
data_source_infos list[DataSourceInfo]
[]
Source code in cloe_metadata/base/repository/data_source_info.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class DataSourceInfos(BaseModel):
    """Base class for loading DataSourceInfos model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path(
        "repository", "data_source_infos"
    )

    data_source_infos: list[DataSourceInfo] = Field(default=[])
    _data_source_infos_cache: dict[uuid.UUID, DataSourceInfo] = {}

    model_config = ConfigDict(
        populate_by_name=True, alias_generator=writer.to_lower_camel_case
    )

    @field_validator("data_source_infos")
    @classmethod
    def child_uniqueness_check(cls, value: list[DataSourceInfo]):
        validators.find_non_unique(value, "object_description")
        return value

    def get_data_source_infos(self) -> dict[uuid.UUID, DataSourceInfo]:
        if len(self._data_source_infos_cache) < 1:
            self._data_source_infos_cache = {
                data_source_info.id: data_source_info
                for data_source_info in self.data_source_infos
            }
        return self._data_source_infos_cache

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[DataSourceInfos, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances, errors = DataSourceInfo.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(data_source_infos=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.data_source_infos:
            child.write_to_disk(output_path / self.subfolder_path)

Database

Bases: WithSubfoldersMixin

Base class for loading Database model objects.

Parameters:

Name Type Description Default
id UUID

ID of the database

UUID('bdebe43f-047b-41e9-80de-6a038964261a')
display_name str | None

Display name of the database, mostly used for GUI.

None
name str

Technical name of the database

required
schemas list[Schema]
[]
Source code in cloe_metadata/base/repository/database/database.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Database(WithSubfoldersMixin):
    """Base class for loading Database model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("schemas")
    exclude_when_writing_to_disk: ClassVar[set] = {"schemas"}
    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID = Field(default_factory=uuid.uuid4, description="ID of the database")
    display_name: str | None = Field(
        default=None, description="Display name of the database, mostly used for GUI."
    )
    name: str = Field(..., description="Technical name of the database")
    schemas: list[schema.Schema] = []

    _check_name_template = field_validator("name")(validators.check_if_valid_template)

    @field_validator("schemas")
    @classmethod
    def child_uniqueness_check(cls, value: list[schema.Schema]):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def json_object_to_class(
        cls, data: dict, instance_dir: pathlib.Path
    ) -> tuple[Database | None, list[ValidationError | json.JSONDecodeError]]:
        instance_folderpath = instance_dir / cls.subfolder_path
        schemas, sub_errors = schema.Schema.read_instances_from_disk(
            instance_folderpath
        )
        try:
            instance = cls(**data, schemas=schemas)
        except ValidationError as e:
            instance = None
            sub_errors.append(e)
        return instance, sub_errors

    def _write_childs_to_disk(self, sub_output_path: pathlib.Path) -> None:
        for child in self.schemas:
            child.write_to_disk(sub_output_path / self.subfolder_path)

Databases

Bases: BaseModel

Base class for loading Databases model objects.

Parameters:

Name Type Description Default
databases list[Database]
required
Source code in cloe_metadata/base/repository/database/database.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class Databases(BaseModel):
    """Base class for loading Databases model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("repository", "databases")

    databases: list[Database]
    _tables_cache: dict[uuid.UUID, table.Table] = {}
    model_config = ConfigDict(
        populate_by_name=True, alias_generator=writer.to_lower_camel_case
    )

    @field_validator("databases")
    @classmethod
    def child_uniqueness_check(cls, value: list[Database]):
        validators.find_non_unique(value, "name")
        return value

    @property
    def id_to_tables(self) -> dict[uuid.UUID, table.Table]:
        if len(self._tables_cache) < 1:
            self._tables_cache = {
                table.id: table
                for database in self.databases
                for schema in database.schemas
                for table in schema.tables
            }
        return self._tables_cache

    def get_table_and_schema(
        self, table_id: uuid.UUID
    ) -> tuple[schema.Schema | None, table.Table | None]:
        for database in self.databases:
            for schema_obj in database.schemas:
                for table_obj in schema_obj.tables:
                    if table_obj.id == table_id:
                        return schema_obj, table_obj
        return None, None

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[Databases, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances, errors = Database.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(databases=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(
        self, output_path: pathlib.Path, delete_existing: bool = False
    ) -> None:
        sub_output_path = output_path / self.subfolder_path
        if delete_existing and sub_output_path.exists() and sub_output_path.is_dir():
            shutil.rmtree(sub_output_path)
        for child in self.databases:
            child.write_to_disk(output_path / self.subfolder_path, delete_existing)

Dataflow

Bases: WithoutSubfoldersMixin

Dataflow metadata model base class

Parameters:

Name Type Description Default
name str

Name of the DataFlow(must be unique)

required
sink_table_id UUID

Reference to the sink table

required
sql_template_id int

Reference to sql template that should be used

required
id UUID

ID of the dataflow

UUID('b4767bd0-372c-4fb2-be13-979bf1c7f8ba')
job_id UUID | None

Reference to the sink Exec_SQL job

None
include_dq1 bool

Use DQ1 features

True
column_mappings list[Annotated[ColumnMapping, AfterValidator]]
required
include_dq2 bool

Use DQ2 features

True
include_dq3 bool

Use DQ3 Features

False
log_dq1 bool

Log DQ1 errors

True
log_dq2 bool

Log DQ2 errors

True
log_dq3 bool

Log DQ3 errors

False
source_tables list[SourceTable]
required
lookups list[Lookup] | None
None
post_processing_sql str | None

Execute SQL query after Dataflow

None
pre_processing_sql str | None

Execute SQL query before Dataflow

None
Source code in cloe_metadata/base/modeler/dataflow/dataflow.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class Dataflow(WithoutSubfoldersMixin):
    """Dataflow metadata model base class"""

    attribute_used_for_filename: ClassVar[str] = "name"

    name: str = Field(..., description="Name of the DataFlow(must be unique)")
    sink_table_id: uuid.UUID = Field(..., description="Reference to the sink table")
    sql_template_id: int = Field(
        ..., description="Reference to sql template that should be used"
    )
    id: uuid.UUID = Field(default_factory=uuid.uuid4, description="ID of the dataflow")
    job_id: uuid.UUID | None = Field(
        default=None, description="Reference to the sink Exec_SQL job"
    )
    include_dq1: bool = Field(default=True, description="Use DQ1 features")
    column_mappings: list[
        Annotated[
            column_mapping.ColumnMapping,
            AfterValidator(lookup_parameters_order_by_unique_check),
        ]
    ]
    include_dq2: bool = Field(default=True, description="Use DQ2 features")
    include_dq3: bool = Field(default=False, description="Use DQ3 Features")
    log_dq1: bool = Field(default=True, description="Log DQ1 errors")
    log_dq2: bool = Field(default=True, description="Log DQ2 errors")
    log_dq3: bool = Field(default=False, description="Log DQ3 errors")
    source_tables: list[source_table.SourceTable]
    lookups: list[lib_lookup.Lookup] | None = Field(default=None)
    post_processing_sql: str | None = Field(
        default=None, description="Execute SQL query after Dataflow"
    )
    pre_processing_sql: str | None = Field(
        default=None, description="Execute SQL query before Dataflow"
    )

    _check_name_w_replace = field_validator("name")(
        validators.name_alphanumeric_w_replace
    )

    @field_validator("column_mappings")
    @classmethod
    def column_mapping_bk_check(cls, value: list[column_mapping.ColumnMapping]):
        if any([column_mapping.bk_order is not None for column_mapping in value]):
            return value
        else:
            raise ValueError("no bk set.")

    @field_validator("include_dq2")
    @classmethod
    def dq2_and_conversion_check(cls, value: bool, info: ValidationInfo):
        column_mappings: list[column_mapping.ColumnMapping] = info.data.get(
            "column_mappings", []
        )
        if len(column_mappings) > 0:
            if any(
                [
                    column_mapping.convert_to_datatype is not None
                    for column_mapping in column_mappings
                ]
            ):
                return value
            else:
                logger.warning("DQ2 activated but no conversions. Deactivating DQ2.")
                return False
        return value

    @field_validator("log_dq2")
    @classmethod
    def dq2_log_and_conversion_check(cls, value: bool, info: ValidationInfo):
        column_mappings: list[column_mapping.ColumnMapping] = info.data.get(
            "column_mappings", []
        )
        if len(column_mappings) > 0:
            if any(
                [
                    column_mapping.is_logging_on_convert_error
                    for column_mapping in column_mappings
                ]
            ):
                return value
            else:
                logger.warning(
                    "DQ2 activated but no columns marked for logging."
                    " Deactivating DQ2 logging."
                )
                return False
        return value

    @field_validator("source_tables")
    @classmethod
    def is_active_check(cls, value: list[source_table.SourceTable]):
        if all([not source_table.is_active for source_table in value]):
            raise ValueError("at least one source table must be active.")
        return value

    @staticmethod
    def _read_lookup_instances(
        data: list[dict],
    ) -> tuple[list[lib_lookup.Lookup], list[ValidationError]]:
        instances = []
        errors = []
        for raw_lookup in data:
            try:
                lookup_parameters = [
                    lib_lookup.LookupParameter(**l_parameter)
                    for l_parameter in raw_lookup.pop("lookupParameters", [])
                ]
                return_column_mapping = [
                    lib_lookup.ReturnColumnMapping(**l_rcm)
                    for l_rcm in raw_lookup.pop("returnColumnMappings", [])
                ]
                lookup = lib_lookup.Lookup(
                    **raw_lookup,
                    lookup_parameters=lookup_parameters,
                    return_column_mappings=return_column_mapping,
                )
                instances.append(lookup)
            except ValidationError as e:
                errors.append(e)
        return instances, errors

    @staticmethod
    def _read_source_table_instances(
        data: list[dict],
    ) -> tuple[list[source_table.SourceTable], list[ValidationError]]:
        instances = []
        errors = []
        for st in data:
            try:
                instances.append(source_table.SourceTable(**st))
            except ValidationError as e:
                errors.append(e)
        return instances, errors

    @staticmethod
    def _read_column_mapping_instances(
        data: list[dict],
    ) -> tuple[list[column_mapping.ColumnMapping], list[ValidationError]]:
        instances = []
        errors = []
        for cm in data:
            try:
                instances.append(column_mapping.ColumnMapping(**cm))
            except ValidationError as e:
                errors.append(e)
        return instances, errors

    @classmethod
    def json_object_to_class(
        cls, data: dict
    ) -> tuple[Dataflow | None, list[ValidationError]]:
        errors = []
        lookups, l_errors = cls._read_lookup_instances(data.pop("lookups", []))
        errors += l_errors
        source_tables, st_errors = cls._read_source_table_instances(
            data.pop("tableMappings", [])
        )
        errors += st_errors
        column_mappings, cm_errors = cls._read_column_mapping_instances(
            data.pop("columnMappings", [])
        )
        errors += cm_errors
        try:
            instance = cls(
                **data,
                source_tables=source_tables,
                column_mappings=column_mappings,
                lookups=lookups,
            )
        except ValidationError as e:
            instance = None
            errors.append(e)
        return instance, errors

DatasetType

Bases: WithoutSubfoldersMixin

Base class for loading DatasetType model objects.

Parameters:

Name Type Description Default
id UUID
required
name str
required
storage_format Literal[str, str]
required
attributes list | None
required
Source code in cloe_metadata/base/repository/dataset_type.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class DatasetType(WithoutSubfoldersMixin):
    """Base class for loading DatasetType model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID
    name: str
    storage_format: Literal["CSV", "Parquet"]
    attributes: list | None

    _check_name = field_validator("name")(validators.name_alphanumeric)

    @property
    def is_parquet(self) -> bool:
        return self.storage_format.lower() == "parquet"

    @property
    def is_csv(self) -> bool:
        return self.storage_format.lower() == "csv"

DatasetTypes

Bases: BaseModel

Base class for loading DatasetType model objects.

Parameters:

Name Type Description Default
dataset_types list[DatasetType]
[]
Source code in cloe_metadata/base/repository/dataset_type.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class DatasetTypes(BaseModel):
    """Base class for loading DatasetType model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("repository", "dataset_types")

    dataset_types: list[DatasetType] = Field(default=[])

    _dataset_types_cache: dict[uuid.UUID, DatasetType] = {}
    model_config = ConfigDict(
        populate_by_name=True, alias_generator=writer.to_lower_camel_case
    )

    @field_validator("dataset_types")
    @classmethod
    def child_uniqueness_check(cls, value: list[DatasetType]):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[DatasetTypes, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances, errors = DatasetType.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(dataset_types=instances)
        except ValidationError as e:
            errors.append(e)
        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.dataset_types:
            child.write_to_disk(output_path / self.subfolder_path)

    def get_dataset_types(self) -> dict[uuid.UUID, DatasetType]:
        if len(self._dataset_types_cache) < 1:
            self._dataset_types_cache = {
                dataset_types.id: dataset_types for dataset_types in self.dataset_types
            }
        return self._dataset_types_cache

DatatypeTemplate

Bases: WithoutSubfoldersMixin

DatatypeTemplate metadata model base class

Parameters:

Name Type Description Default
source_type str
required
parameter_template str
required
Source code in cloe_metadata/base/modeler/templates/datatype_template.py
22
23
24
25
26
27
28
29
30
31
32
33
34
class DatatypeTemplate(WithoutSubfoldersMixin):
    """DatatypeTemplate metadata model base class"""

    attribute_used_for_filename: ClassVar[str] = "source_type"

    source_type: str
    parameter_template: str

    _check_name = field_validator("source_type")(validators.name_alphanumeric)

    _check_parameter_template = field_validator("parameter_template")(
        validators.check_if_valid_template
    )

DatatypeTemplates

Bases: BaseModel

Base class for loading CLOE DatatypeTemplate model objects.

Parameters:

Name Type Description Default
datatype_templates list[DatatypeTemplate]
[]
Source code in cloe_metadata/base/modeler/templates/datatype_template.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class DatatypeTemplates(BaseModel):
    """Base class for loading CLOE DatatypeTemplate model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path(
        "modeler", "templates", "datatype_templates"
    )

    datatype_templates: list[DatatypeTemplate] = []
    _datatype_template_cache: dict[str, DatatypeTemplate] = {}
    model_config = ConfigDict(
        populate_by_name=True, alias_generator=writer.to_lower_camel_case
    )

    @field_validator("datatype_templates")
    @classmethod
    def child_uniqueness_check(cls, value: list[DatatypeTemplate]):
        validators.find_non_unique(value, "source_type")
        return value

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[DatatypeTemplates, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances, errors = DatatypeTemplate.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(datatype_templates=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.datatype_templates:
            child.write_to_disk(output_path / self.subfolder_path)

    def get_templates(self) -> dict[str, DatatypeTemplate]:
        if len(self._datatype_template_cache) < 1:
            self._datatype_template_cache = {
                template.source_type: template for template in self.datatype_templates
            }
        return self._datatype_template_cache

ExecSQL

Bases: WithoutSubfoldersMixin

Base class for loading ExecSQL model objects.

Parameters:

Name Type Description Default
id UUID

ID of the job

required
name str

Name of the job(must be unique)

required
description str | None

Description of the job.

None
connection_id UUID

Reference the sink connection. Will later be used to establish a connection to the stored procedures database.

required
queries list[Query]

All queries belonging to this job.

required
Source code in cloe_metadata/base/jobs/exec_sql.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ExecSQL(WithoutSubfoldersMixin):
    """Base class for loading ExecSQL model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID = Field(..., description="ID of the job")
    name: str = Field(..., description="Name of the job(must be unique)")
    description: str | None = Field(default=None, description="Description of the job.")
    connection_id: uuid.UUID = Field(
        ...,
        description="Reference the sink connection. Will later be used to establish a connection to the stored procedures database.",
    )
    queries: list[Query] = Field(..., description="All queries belonging to this job.")

    @field_validator("queries")
    @classmethod
    def runtimes_order_by_unique_check(cls, value: list[Query]):
        order_number = []
        for query in value:
            if query.exec_order not in order_number:
                order_number.append(query.exec_order)
            else:
                raise ValueError("Queries exec_order not unique")
        return value

    @classmethod
    def json_object_to_class(
        cls, data: dict
    ) -> tuple[ExecSQL | None, list[ValidationError]]:
        errors = []
        queries = []
        for query in data.pop("queries", []):
            try:
                queries.append(Query(**query))
            except ValidationError as e:
                errors.append(e)
        try:
            instance = cls(**data, queries=queries)
        except ValidationError as e:
            instance = None
            errors.append(e)
        return instance, errors

FS2DB

Bases: WithoutSubfoldersMixin

Base class for loading FS2DB model objects.

Parameters:

Name Type Description Default
id UUID

ID of the job

required
name str

Name of the job(must be unique)

required
description str | None

Description of the job.

None
sink_connection_id UUID

Reference the sink connection. Will be used to establish a connection to the database.

required
source_connection_id UUID

Reference the sink connection. Will be used to establish a connection to the bucket/blob.

required
container_name str

Name of the bucket/blob where the file are located.

required
filename_pattern str

Pattern by which the filenames on Blob/Bucket or FileCatalog should be filtered.

required
folder_path_pattern str

Pattern by which the path on Blob/Bucket or FileCatalog should be filtered..

required
sink_table_id UUID

Reference to the sink table where data should be loaded to.

required
dataset_type_id UUID

Reference a datasettype which is partly used to generate the filename and to define the target filetype(e.g. Parquet).

required
get_from_filecatalog bool

If a filecatalog is used to corrdinator file loading.

False
post_load_exec_job_id UUID | None

Exec Job to be executed after successful loading a file.

None
Source code in cloe_metadata/base/jobs/fs2db.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class FS2DB(WithoutSubfoldersMixin):
    """Base class for loading FS2DB model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID = Field(..., description="ID of the job")
    name: str = Field(..., description="Name of the job(must be unique)")
    description: str | None = Field(default=None, description="Description of the job.")
    sink_connection_id: uuid.UUID = Field(
        ...,
        description="Reference the sink connection. Will be used to establish a connection to the database.",
    )
    source_connection_id: uuid.UUID = Field(
        ...,
        description="Reference the sink connection. Will be used to establish a connection to the bucket/blob.",
    )
    container_name: str = Field(
        ..., description="Name of the bucket/blob where the file are located."
    )
    filename_pattern: str = Field(
        ...,
        description="Pattern by which the filenames on Blob/Bucket or FileCatalog should be filtered.",
    )
    folder_path_pattern: str = Field(
        ...,
        description="Pattern by which the path on Blob/Bucket or FileCatalog should be filtered..",
    )
    sink_table_id: uuid.UUID = Field(
        ..., description="Reference to the sink table where data should be loaded to."
    )
    dataset_type_id: uuid.UUID = Field(
        ...,
        description="Reference a datasettype which is partly used to generate the filename and to define the target filetype(e.g. Parquet).",
    )
    get_from_filecatalog: bool = Field(
        default=False,
        description="If a filecatalog is used to corrdinator file loading.",
    )
    post_load_exec_job_id: uuid.UUID | None = Field(
        default=None,
        description="Exec Job to be executed after successful loading a file.",
    )

    _check_filename_pattern = field_validator("filename_pattern")(
        validators.check_if_valid_template
    )

    _check_folder_path_pattern = field_validator("folder_path_pattern")(
        validators.check_if_valid_template
    )

Flows

Bases: BaseModel

Base class for loading CLOE Pipe model objects.

Parameters:

Name Type Description Default
dataflows list[Union[CustomDataflow, Dataflow]]
[]
Source code in cloe_metadata/base/modeler/flows.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class Flows(BaseModel):
    """Base class for loading CLOE Pipe model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("modeler")
    child_to_folder_name: ClassVar[dict[str, str]] = {
        dataflow.Dataflow.__name__: "dataflows",
        custom_dataflow.CustomDataflow.__name__: "custom_dataflows",
    }

    dataflows: list[custom_dataflow.CustomDataflow | dataflow.Dataflow] = Field(
        default=[]
    )
    model_config = ConfigDict(
        populate_by_name=True,
        alias_generator=writer.to_lower_camel_case,
    )

    @field_validator("dataflows")
    @classmethod
    def child_uniqueness_check(
        cls, value: list[custom_dataflow.CustomDataflow | dataflow.Dataflow]
    ):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[Flows, list[ValidationError | json.JSONDecodeError]]:
        instances: list[custom_dataflow.CustomDataflow | dataflow.Dataflow] = []
        errors = []
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances_path = input_path / cls.subfolder_path
        sub_pipes: list[
            type[custom_dataflow.CustomDataflow] | type[dataflow.Dataflow]
        ] = [
            custom_dataflow.CustomDataflow,
            dataflow.Dataflow,
        ]
        for sub_pipe in sub_pipes:
            sub_instances, sub_errors = sub_pipe.read_instances_from_disk(
                instances_path / cls.child_to_folder_name[sub_pipe.__name__]
            )
            instances += sub_instances
            errors += sub_errors
        try:
            instance = cls(dataflows=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.dataflows:
            child.write_to_disk(
                output_path
                / self.subfolder_path
                / self.child_to_folder_name[child.__class__.__name__]
            )

    def get_dataflows(
        self,
    ) -> list[dataflow.Dataflow]:
        """
        Filters the pipes list based on the given pipe type.
        """
        return [pipe for pipe in self.dataflows if isinstance(pipe, dataflow.Dataflow)]

    def get_custom_dataflows(
        self,
    ) -> list[custom_dataflow.CustomDataflow]:
        """
        Filters the pipes list based on the given pipe type.
        """
        return [
            pipe
            for pipe in self.dataflows
            if isinstance(pipe, custom_dataflow.CustomDataflow)
        ]

get_custom_dataflows()

Filters the pipes list based on the given pipe type.

Source code in cloe_metadata/base/modeler/flows.py
87
88
89
90
91
92
93
94
95
96
97
def get_custom_dataflows(
    self,
) -> list[custom_dataflow.CustomDataflow]:
    """
    Filters the pipes list based on the given pipe type.
    """
    return [
        pipe
        for pipe in self.dataflows
        if isinstance(pipe, custom_dataflow.CustomDataflow)
    ]

get_dataflows()

Filters the pipes list based on the given pipe type.

Source code in cloe_metadata/base/modeler/flows.py
79
80
81
82
83
84
85
def get_dataflows(
    self,
) -> list[dataflow.Dataflow]:
    """
    Filters the pipes list based on the given pipe type.
    """
    return [pipe for pipe in self.dataflows if isinstance(pipe, dataflow.Dataflow)]

Jobs

Bases: BaseModel

Base class for loading Job model objects.

Parameters:

Name Type Description Default
jobs list[Union[DB2FS, FS2DB, ExecSQL]]
[]
Source code in cloe_metadata/base/jobs/jobs.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class Jobs(BaseModel):
    """Base class for loading Job model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("jobs")
    child_to_folder_name: ClassVar[dict[str, str]] = {
        DB2FS.__name__: "db2fs",
        FS2DB.__name__: "fs2db",
        ExecSQL.__name__: "execsql",
    }

    jobs: list[DB2FS | FS2DB | ExecSQL] = Field(default=[])
    _jobs_cache: dict[uuid.UUID, DB2FS | FS2DB | ExecSQL] = {}
    model_config = ConfigDict(
        populate_by_name=True, alias_generator=writer.to_lower_camel_case
    )

    @field_validator("jobs")
    @classmethod
    def child_uniqueness_check(cls, value: list[DB2FS | FS2DB | ExecSQL]):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[Jobs, list[ValidationError | json.JSONDecodeError]]:
        instances: list[DB2FS | FS2DB | ExecSQL] = []
        errors = []
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances_path = input_path / cls.subfolder_path
        sub_jobs: list[type[DB2FS] | type[FS2DB] | type[ExecSQL]] = [
            DB2FS,
            FS2DB,
            ExecSQL,
        ]
        for sub_job in sub_jobs:
            sub_instances, sub_errors = sub_job.read_instances_from_disk(
                instances_path / cls.child_to_folder_name[sub_job.__name__]
            )
            instances += sub_instances
            errors += sub_errors
        try:
            instance = cls(jobs=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.jobs:
            child.write_to_disk(
                output_path
                / self.subfolder_path
                / self.child_to_folder_name[child.__class__.__name__]
            )

    def get_jobs(self) -> dict[uuid.UUID, DB2FS | FS2DB | ExecSQL]:
        if len(self._jobs_cache) < 1:
            self._jobs_cache = {jobs.id: jobs for jobs in self.jobs}
        return self._jobs_cache

    def get_db2fs_jobs(
        self,
    ) -> list[DB2FS]:
        """
        Filters the jobs list

        :return: List of jobs of the specified type
        """
        return [job for job in self.jobs if isinstance(job, DB2FS)]

    def get_fs2db_jobs(
        self,
    ) -> list[FS2DB]:
        """
        Filters the jobs list

        :return: List of jobs of the specified type
        """
        return [job for job in self.jobs if isinstance(job, FS2DB)]

    def get_exec_sql_jobs(
        self,
    ) -> list[ExecSQL]:
        """
        Filters the jobs list

        :return: List of jobs of the specified type
        """
        return [job for job in self.jobs if isinstance(job, ExecSQL)]

get_db2fs_jobs()

Filters the jobs list

:return: List of jobs of the specified type

Source code in cloe_metadata/base/jobs/jobs.py
81
82
83
84
85
86
87
88
89
def get_db2fs_jobs(
    self,
) -> list[DB2FS]:
    """
    Filters the jobs list

    :return: List of jobs of the specified type
    """
    return [job for job in self.jobs if isinstance(job, DB2FS)]

get_exec_sql_jobs()

Filters the jobs list

:return: List of jobs of the specified type

Source code in cloe_metadata/base/jobs/jobs.py
101
102
103
104
105
106
107
108
109
def get_exec_sql_jobs(
    self,
) -> list[ExecSQL]:
    """
    Filters the jobs list

    :return: List of jobs of the specified type
    """
    return [job for job in self.jobs if isinstance(job, ExecSQL)]

get_fs2db_jobs()

Filters the jobs list

:return: List of jobs of the specified type

Source code in cloe_metadata/base/jobs/jobs.py
91
92
93
94
95
96
97
98
99
def get_fs2db_jobs(
    self,
) -> list[FS2DB]:
    """
    Filters the jobs list

    :return: List of jobs of the specified type
    """
    return [job for job in self.jobs if isinstance(job, FS2DB)]

SQLTemplate

Bases: WithoutSubfoldersMixin

SQLTemplate metadata model base class

Parameters:

Name Type Description Default
id int
required
name str
required
template str
required
description str | None
None
Source code in cloe_metadata/base/modeler/templates/sql_template.py
22
23
24
25
26
27
28
29
30
31
32
33
34
class SQLTemplate(WithoutSubfoldersMixin):
    """SQLTemplate metadata model base class"""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: int
    name: str
    template: str
    description: str | None = None

    _check_name = field_validator("name")(validators.name_alphanumeric)

    _check_template = field_validator("template")(validators.check_if_valid_template)

SQLTemplates

Bases: BaseModel

Base class for loading CLOE SQLTemplate model objects.

Parameters:

Name Type Description Default
sql_templates list[SQLTemplate]
[]
Source code in cloe_metadata/base/modeler/templates/sql_template.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class SQLTemplates(BaseModel):
    """Base class for loading CLOE SQLTemplate model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path(
        "modeler", "templates", "sql_templates"
    )

    sql_templates: list[SQLTemplate] = []
    _sql_template_cache: dict[int, SQLTemplate] = {}
    model_config = ConfigDict(
        populate_by_name=True, alias_generator=writer.to_lower_camel_case
    )

    @field_validator("sql_templates")
    @classmethod
    def child_uniqueness_check(cls, value: list[SQLTemplate]):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[SQLTemplates, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances, errors = SQLTemplate.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(sql_templates=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.sql_templates:
            child.write_to_disk(output_path / self.subfolder_path)

    def get_templates(self) -> dict[int, SQLTemplate]:
        if len(self._sql_template_cache) < 1:
            self._sql_template_cache = {
                template.id: template for template in self.sql_templates
            }
        return self._sql_template_cache

Schema

Bases: WithSubfoldersMixin

Base class for loading Schema model objects.

Parameters:

Name Type Description Default
id UUID

ID of the schema

UUID('e236167c-639a-4205-892a-01b61d772fb2')
name str

Name of the schema

required
tables list[Table]
[]
Source code in cloe_metadata/base/repository/database/schema.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Schema(WithSubfoldersMixin):
    """Base class for loading Schema model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("tables")
    exclude_when_writing_to_disk: ClassVar[set] = {"tables"}
    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID = Field(default_factory=uuid.uuid4, description="ID of the schema")
    name: str = Field(..., description="Name of the schema")
    tables: list[table.Table] = []

    @field_validator("tables")
    @classmethod
    def child_uniqueness_check(cls, value: list[table.Table]):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def json_object_to_class(
        cls, data: dict, instance_dir: pathlib.Path
    ) -> tuple[Schema | None, list[ValidationError | json.JSONDecodeError]]:
        instance_folderpath = instance_dir / cls.subfolder_path
        tables, sub_errors = table.Table.read_instances_from_disk(instance_folderpath)
        try:
            instance = cls(**data, tables=tables)
        except ValidationError as e:
            instance = None
            sub_errors.append(e)
        return instance, sub_errors

    def _write_childs_to_disk(self, sub_output_path: pathlib.Path) -> None:
        for child in self.tables:
            child.write_to_disk(sub_output_path / self.subfolder_path)

Sourcesystem

Bases: WithoutSubfoldersMixin

Base class for loading Sourcesystem model objects.

Parameters:

Name Type Description Default
id UUID
required
name str
required
Source code in cloe_metadata/base/repository/sourcesystem.py
21
22
23
24
25
26
27
28
29
class Sourcesystem(WithoutSubfoldersMixin):
    """Base class for loading Sourcesystem model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID
    name: str

    _check_name = field_validator("name")(validators.name_alphanumeric)

Sourcesystems

Bases: BaseModel

Base class for loading Sourcesystem model objects.

Parameters:

Name Type Description Default
sourcesystems list[Sourcesystem]
[]
Source code in cloe_metadata/base/repository/sourcesystem.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class Sourcesystems(BaseModel):
    """Base class for loading Sourcesystem model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("repository", "sourcesystems")

    sourcesystems: list[Sourcesystem] = Field(default=[])

    _sourcesystems_cache: dict[uuid.UUID, Sourcesystem] = {}
    model_config = ConfigDict(
        populate_by_name=True, alias_generator=writer.to_lower_camel_case
    )

    @field_validator("sourcesystems")
    @classmethod
    def child_uniqueness_check(cls, value: list[Sourcesystem]):
        validators.find_non_unique(value, "name")
        return value

    def get_sourcesystems(self) -> dict[uuid.UUID, Sourcesystem]:
        if len(self._sourcesystems_cache) < 1:
            self._sourcesystems_cache = {
                sourcesystems.id: sourcesystems for sourcesystems in self.sourcesystems
            }
        return self._sourcesystems_cache

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[Sourcesystems, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances, errors = Sourcesystem.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(sourcesystems=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.sourcesystems:
            child.write_to_disk(output_path / self.subfolder_path)

Table

Bases: WithoutSubfoldersMixin

Base class for loading Table model objects.

Parameters:

Name Type Description Default
id UUID

ID of the table

required
level Literal[str, str, str, str, str, str] | None

Level of the table

None
name str

Name of the Table

required
columns list[Annotated[Column, AfterValidator]]
[]
Source code in cloe_metadata/base/repository/database/table.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class Table(WithoutSubfoldersMixin):
    """Base class for loading Table model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID = Field(..., description="ID of the table")
    level: Literal["src", "stg", "ver", "core", "derived", "lu"] | None = Field(
        default=None, description="Level of the table"
    )
    name: str = Field(..., description="Name of the Table")
    columns: list[
        Annotated[
            column.Column, AfterValidator(validators.name_alphanumeric_table_columns)
        ]
    ] = []

    _check_name = field_validator("name")(validators.name_alphanumeric_table_name)

    @property
    def is_version(self) -> bool:
        return self.level == "ver"

    @classmethod
    def json_object_to_class(
        cls, data: dict
    ) -> tuple[Table | None, list[ValidationError]]:
        errors = []
        columns = []
        for cm in data.pop("columns", []):
            try:
                columns.append(column.Column(**cm))
            except ValidationError as e:
                errors.append(e)
        try:
            instance = cls(**data, columns=columns)
        except ValidationError as e:
            instance = None
            errors.append(e)
        return instance, errors

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path, fail_on_missing_subfolder: bool = True
    ) -> tuple[list[Table], list[ValidationError | json.JSONDecodeError]]:
        stuff, errors = super().read_instances_from_disk(input_path, False)
        return super().read_instances_from_disk(input_path, False)

Tenant

Bases: WithoutSubfoldersMixin

Base class for loading Tenant model objects.

Parameters:

Name Type Description Default
id UUID
required
name str
required
Source code in cloe_metadata/base/repository/tenant.py
24
25
26
27
28
29
30
31
32
class Tenant(WithoutSubfoldersMixin):
    """Base class for loading Tenant model objects."""

    attribute_used_for_filename: ClassVar[str] = "name"

    id: uuid.UUID
    name: str

    _check_name = field_validator("name")(validators.name_alphanumeric)

Tenants

Bases: BaseModel

Base class for loading Tenant model objects.

Parameters:

Name Type Description Default
tenants list[Tenant]
[]
Source code in cloe_metadata/base/repository/tenant.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class Tenants(BaseModel):
    """Base class for loading Tenant model objects."""

    subfolder_path: ClassVar[pathlib.Path] = pathlib.Path("repository", "tenants")

    tenants: list[Tenant] = Field(default=[])

    _tenants_cache: dict[uuid.UUID, Tenant] = {}
    model_config = ConfigDict(
        populate_by_name=True, alias_generator=writer.to_lower_camel_case
    )

    @field_validator("tenants")
    @classmethod
    def child_uniqueness_check(cls, value: list[Tenant]):
        validators.find_non_unique(value, "name")
        return value

    @classmethod
    def read_instances_from_disk(
        cls, input_path: pathlib.Path
    ) -> tuple[Tenants, list[ValidationError | json.JSONDecodeError]]:
        if not input_path.exists():
            raise FileNotFoundError(f"Directory not found: {input_path}")

        instances, errors = Tenant.read_instances_from_disk(
            input_path / cls.subfolder_path
        )
        try:
            instance = cls(tenants=instances)
        except ValidationError as e:
            errors.append(e)

        return instance, errors

    def write_to_disk(self, output_path: pathlib.Path) -> None:
        for child in self.tenants:
            child.write_to_disk(output_path / self.subfolder_path)

    def get_tenants(self) -> dict[uuid.UUID, Tenant]:
        if len(self._tenants_cache) < 1:
            self._tenants_cache = {tenants.id: tenants for tenants in self.tenants}
        return self._tenants_cache