Skip to content

Shared

The shared metadata objects are the second layer of metadata. They might be loaded depending on the package using them. They run advanced validation and consistency checks. They also provide advanced functionality to metadata fields like jinja templating.

Jobs

DB2FS

Bases: BaseModel

Class for advanced or shared DB2FS functionality.

Parameters:

Name Type Description Default
base_obj DB2FS
required
dataset_types DatasetTypes
required
databases Databases
required
data_source_infos dict[UUID, DataSourceInfo]
required
connections Connections
required
Source code in cloe_metadata/shared/jobs/db2fs.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
class DB2FS(BaseModel):
    """Class for advanced or shared DB2FS functionality."""

    base_obj: base.DB2FS
    dataset_types: base.DatasetTypes = Field(..., exclude=True)
    databases: base.Databases = Field(..., exclude=True)
    data_source_infos: dict[uuid.UUID, repository.DataSourceInfo] = Field(
        ..., exclude=True
    )
    connections: base.Connections

    @field_validator("dataset_types")
    @classmethod
    def dataset_type_exists(cls, value: base.DatasetTypes, info: ValidationInfo):
        """Validates that the dataset type exists."""
        base_obj: base.DB2FS | None = info.data.get("base_obj")
        if (
            base_obj is not None
            and base_obj.dataset_type_id not in value.get_dataset_types()
        ):
            raise ValueError("dataset_type_id does not exist")
        return value

    @field_validator("data_source_infos")
    @classmethod
    def data_source_infos_exists(
        cls, value: dict[uuid.UUID, repository.DataSourceInfo], info: ValidationInfo
    ):
        """Validates that the data source info exists."""
        base_obj: base.DB2FS | None = info.data.get("base_obj")
        if base_obj is not None and base_obj.datasource_info_id not in value:
            raise ValueError("datasource_info_id not in datasource_infos")
        return value

    @field_validator("databases")
    @classmethod
    def tables_exists(cls, value: base.Databases, info: ValidationInfo):
        """Validates that the tables exist in the databases."""
        base_obj: base.DB2FS | None = info.data.get("base_obj")
        if base_obj is not None and base_obj.source_table_id not in value.id_to_tables:
            raise ValueError("id not in tables")
        return value

    @field_validator("connections")
    @classmethod
    def sink_connection_exists(cls, value: base.Connections, info: ValidationInfo):
        """Validates that the sink connection exists in the connections."""
        base_obj: base.DB2FS | None = info.data.get("base_obj")
        error_text = ""
        if (
            base_obj is not None
            and base_obj.sink_connection_id not in value.get_connections()
        ):
            error_text += "sink_connection_id "
        if (
            base_obj is not None
            and base_obj.source_connection_id not in value.get_connections()
        ):
            error_text += " source_connection_id "
        if len(error_text) > 1:
            raise ValueError(f"{error_text} not in connections")
        return value

    @property
    def source_connection(self) -> base.Connection:
        """Returns the source connection object."""
        return self.connections.get_connections()[self.base_obj.source_connection_id]

    @property
    def sink_connection(self) -> base.Connection:
        """Returns the sink connection object."""
        return self.connections.get_connections()[self.base_obj.sink_connection_id]

    @property
    def source_table(self) -> base.Table:
        """Returns the source table object."""
        return self.databases.id_to_tables[self.base_obj.source_table_id]

    @property
    def dataset_type(self) -> base.DatasetType:
        """Returns the dataset type object."""
        return self.dataset_types.get_dataset_types()[self.base_obj.dataset_type_id]

    @property
    def data_source_info(self) -> repository.DataSourceInfo:
        """Returns the data source info object."""
        return self.data_source_infos[self.base_obj.datasource_info_id]

    @property
    def rendered_folder_path(self) -> str:
        """
        Renders and returns the folder path using Jinja2 templates.

        The following variables can be used in the Jinja template:

        - `content`: The content of the data source info's base object.
        - `sourcesystem_name`: The name of the source system.
        - `tenant`: The name of the tenant (if available).
        - `object_description`: The description of the base object.
        - `ds_type_name`: The name of the dataset type.

        Example Jinja template:
        ```
        {% if tenant %}
            /{{ tenant }}/{{ sourcesystem_name }}/{{ ds_type_name }}/
        {% else %}
            /{{ sourcesystem_name }}/{{ ds_type_name }}/
        {% endif %}
        ```

        Returns:
            str: The rendered folder path.
        """
        if self.base_obj.folder_path is None:
            return self.data_source_info.sourcesystem.name
        tenant_name = None
        if self.data_source_info.tenant is not None:
            tenant_name = self.data_source_info.tenant.name
        return env.from_string(self.base_obj.folder_path).render(
            content=self.data_source_info.base_obj.content,
            sourcesystem_name=self.data_source_info.sourcesystem.name,
            tenant=tenant_name,
            object_description=self.data_source_info.base_obj.object_description,
            ds_type_name=self.dataset_type.name,
        )

    @property
    def rendered_select_query(self) -> str:
        """
        Renders and returns the select query using Jinja2 templates.

        The following variables can be used in the Jinja template:

        - `source_table_identifier`: The identifier for the source table.
        - `source_table`: The source table object.
        - `source_columns`: The columns of the source table.
        - `source_sourcesystem_name`: The name of the source system.
        - `source_datasettype_name`: The name of the dataset type.
        - `source_datasettype_type`: The storage format of the dataset type.
        - `sequence_column_name`: The name of the sequence column.

        Example Jinja template:
        ```
        SELECT {{ source_columns | join(', ') }}
        FROM {{ source_table_identifier }}
        WHERE {{ sequence_column_name }} > :last_sequence_value
        ```

        Returns:
            str: The rendered select query.
        """
        source_schema, source_table = self.databases.get_table_and_schema(
            self.base_obj.source_table_id
        )
        template = env.get_template("object_identifier.sql.j2")
        source_table_identifier = template.render(
            connection=self.source_connection,
            schema_obj=source_schema,
            table_obj=source_table,
        )
        datasource_infos_name = self.data_source_info.sourcesystem.name
        ds_type_name = self.dataset_types.get_dataset_types()[
            self.base_obj.dataset_type_id
        ].name
        ds_type_type = self.dataset_types.get_dataset_types()[
            self.base_obj.dataset_type_id
        ].storage_format
        return env.from_string(self.base_obj.select_statement).render(
            source_table_identifier=source_table_identifier,
            source_table=self.source_table,
            source_columns=self.source_table.columns,
            source_sourcesystem_name=datasource_infos_name,
            source_datasettype_name=ds_type_name,
            source_datasettype_type=ds_type_type,
            sequence_column_name=self.base_obj.sequence_column_name,
        )

data_source_info: repository.DataSourceInfo property

Returns the data source info object.

dataset_type: base.DatasetType property

Returns the dataset type object.

rendered_folder_path: str property

Renders and returns the folder path using Jinja2 templates.

The following variables can be used in the Jinja template:

  • content: The content of the data source info's base object.
  • sourcesystem_name: The name of the source system.
  • tenant: The name of the tenant (if available).
  • object_description: The description of the base object.
  • ds_type_name: The name of the dataset type.

Example Jinja template:

{% if tenant %}
    /{{ tenant }}/{{ sourcesystem_name }}/{{ ds_type_name }}/
{% else %}
    /{{ sourcesystem_name }}/{{ ds_type_name }}/
{% endif %}

Returns:

Name Type Description
str str

The rendered folder path.

rendered_select_query: str property

Renders and returns the select query using Jinja2 templates.

The following variables can be used in the Jinja template:

  • source_table_identifier: The identifier for the source table.
  • source_table: The source table object.
  • source_columns: The columns of the source table.
  • source_sourcesystem_name: The name of the source system.
  • source_datasettype_name: The name of the dataset type.
  • source_datasettype_type: The storage format of the dataset type.
  • sequence_column_name: The name of the sequence column.

Example Jinja template:

SELECT {{ source_columns | join(', ') }}
FROM {{ source_table_identifier }}
WHERE {{ sequence_column_name }} > :last_sequence_value

Returns:

Name Type Description
str str

The rendered select query.

sink_connection: base.Connection property

Returns the sink connection object.

source_connection: base.Connection property

Returns the source connection object.

source_table: base.Table property

Returns the source table object.

data_source_infos_exists(value, info) classmethod

Validates that the data source info exists.

Source code in cloe_metadata/shared/jobs/db2fs.py
36
37
38
39
40
41
42
43
44
45
@field_validator("data_source_infos")
@classmethod
def data_source_infos_exists(
    cls, value: dict[uuid.UUID, repository.DataSourceInfo], info: ValidationInfo
):
    """Validates that the data source info exists."""
    base_obj: base.DB2FS | None = info.data.get("base_obj")
    if base_obj is not None and base_obj.datasource_info_id not in value:
        raise ValueError("datasource_info_id not in datasource_infos")
    return value

dataset_type_exists(value, info) classmethod

Validates that the dataset type exists.

Source code in cloe_metadata/shared/jobs/db2fs.py
24
25
26
27
28
29
30
31
32
33
34
@field_validator("dataset_types")
@classmethod
def dataset_type_exists(cls, value: base.DatasetTypes, info: ValidationInfo):
    """Validates that the dataset type exists."""
    base_obj: base.DB2FS | None = info.data.get("base_obj")
    if (
        base_obj is not None
        and base_obj.dataset_type_id not in value.get_dataset_types()
    ):
        raise ValueError("dataset_type_id does not exist")
    return value

sink_connection_exists(value, info) classmethod

Validates that the sink connection exists in the connections.

Source code in cloe_metadata/shared/jobs/db2fs.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@field_validator("connections")
@classmethod
def sink_connection_exists(cls, value: base.Connections, info: ValidationInfo):
    """Validates that the sink connection exists in the connections."""
    base_obj: base.DB2FS | None = info.data.get("base_obj")
    error_text = ""
    if (
        base_obj is not None
        and base_obj.sink_connection_id not in value.get_connections()
    ):
        error_text += "sink_connection_id "
    if (
        base_obj is not None
        and base_obj.source_connection_id not in value.get_connections()
    ):
        error_text += " source_connection_id "
    if len(error_text) > 1:
        raise ValueError(f"{error_text} not in connections")
    return value

tables_exists(value, info) classmethod

Validates that the tables exist in the databases.

Source code in cloe_metadata/shared/jobs/db2fs.py
47
48
49
50
51
52
53
54
@field_validator("databases")
@classmethod
def tables_exists(cls, value: base.Databases, info: ValidationInfo):
    """Validates that the tables exist in the databases."""
    base_obj: base.DB2FS | None = info.data.get("base_obj")
    if base_obj is not None and base_obj.source_table_id not in value.id_to_tables:
        raise ValueError("id not in tables")
    return value

ExecSQL

Bases: BaseModel

Base class for loading ExecSQL model objects.

Parameters:

Name Type Description Default
base_obj ExecSQL
required
connections Connections
required
Source code in cloe_metadata/shared/jobs/exec_sql.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class ExecSQL(BaseModel):
    """Base class for loading ExecSQL model objects."""

    base_obj: exec_sql.ExecSQL
    connections: base.Connections = Field(..., exclude=True)

    @field_validator("connections")
    @classmethod
    def sink_connection_exists(cls, value: base.Connections, info: ValidationInfo):
        """
        Validates that the connection exists in the connections.

        Args:
            value (base.Connections): The connections object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the connection_id does not exist in the connections.

        Returns:
            base.Connections: The validated connections object.
        """
        base_obj: base.ExecSQL | None = info.data.get("base_obj")
        if (
            base_obj is not None
            and base_obj.connection_id not in value.get_connections()
        ):
            raise ValueError("connection_id not in connections")
        return value

    @property
    def sink_connection(self) -> base.Connection:
        """
        Returns the sink connection object.

        Returns:
            base.Connection: The sink connection object.
        """
        return self.connections.get_connections()[self.base_obj.connection_id]

    def get_rendered_runtimes(self) -> list[str]:
        """
        Returns a list of rendered SQL queries sorted by execution order.

        The queries are rendered using the `base_models.get_rendered_query` method.

        Returns:
            list[str]: A list of rendered SQL queries.
        """
        return [
            base_models.get_rendered_query(query)
            for query in sorted(self.base_obj.queries, key=lambda x: x.exec_order)
        ]

    def get_procedure_name(self) -> str:
        """
        Returns the stored procedure name based on the ExecSQL name.

        The name is formatted to be lowercase and spaces are replaced with underscores.

        Returns:
            str: The formatted stored procedure name.
        """
        return f"sp_{self.base_obj.name.lower().replace(' ', '_')}"

sink_connection: base.Connection property

Returns the sink connection object.

Returns:

Type Description
Connection

base.Connection: The sink connection object.

get_procedure_name()

Returns the stored procedure name based on the ExecSQL name.

The name is formatted to be lowercase and spaces are replaced with underscores.

Returns:

Name Type Description
str str

The formatted stored procedure name.

Source code in cloe_metadata/shared/jobs/exec_sql.py
66
67
68
69
70
71
72
73
74
75
def get_procedure_name(self) -> str:
    """
    Returns the stored procedure name based on the ExecSQL name.

    The name is formatted to be lowercase and spaces are replaced with underscores.

    Returns:
        str: The formatted stored procedure name.
    """
    return f"sp_{self.base_obj.name.lower().replace(' ', '_')}"

get_rendered_runtimes()

Returns a list of rendered SQL queries sorted by execution order.

The queries are rendered using the base_models.get_rendered_query method.

Returns:

Type Description
list[str]

list[str]: A list of rendered SQL queries.

Source code in cloe_metadata/shared/jobs/exec_sql.py
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_rendered_runtimes(self) -> list[str]:
    """
    Returns a list of rendered SQL queries sorted by execution order.

    The queries are rendered using the `base_models.get_rendered_query` method.

    Returns:
        list[str]: A list of rendered SQL queries.
    """
    return [
        base_models.get_rendered_query(query)
        for query in sorted(self.base_obj.queries, key=lambda x: x.exec_order)
    ]

sink_connection_exists(value, info) classmethod

Validates that the connection exists in the connections.

Parameters:

Name Type Description Default
value Connections

The connections object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the connection_id does not exist in the connections.

Returns:

Type Description

base.Connections: The validated connections object.

Source code in cloe_metadata/shared/jobs/exec_sql.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@field_validator("connections")
@classmethod
def sink_connection_exists(cls, value: base.Connections, info: ValidationInfo):
    """
    Validates that the connection exists in the connections.

    Args:
        value (base.Connections): The connections object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the connection_id does not exist in the connections.

    Returns:
        base.Connections: The validated connections object.
    """
    base_obj: base.ExecSQL | None = info.data.get("base_obj")
    if (
        base_obj is not None
        and base_obj.connection_id not in value.get_connections()
    ):
        raise ValueError("connection_id not in connections")
    return value

FS2DB

Bases: BaseModel

Class for advanced or shared FS2DB functionality.

Parameters:

Name Type Description Default
base_obj FS2DB
required
dataset_types DatasetTypes
required
databases Databases
required
connections Connections
required
exec_sqls dict[UUID, ExecSQL]
required
Source code in cloe_metadata/shared/jobs/fs2db.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class FS2DB(BaseModel):
    """Class for advanced or shared FS2DB functionality."""

    base_obj: base.FS2DB
    dataset_types: base.DatasetTypes = Field(..., exclude=True)
    databases: base.Databases = Field(..., exclude=True)
    connections: base.Connections = Field(..., exclude=True)
    exec_sqls: dict[uuid.UUID, jobs.ExecSQL] = Field(..., exclude=True)

    @field_validator("dataset_types")
    @classmethod
    def dataset_type_exists(cls, value: base.DatasetTypes, info: ValidationInfo):
        """Validates that the dataset type exists."""
        base_obj: base.FS2DB | None = info.data.get("base_obj")
        if (
            base_obj is not None
            and base_obj.dataset_type_id not in value.get_dataset_types()
        ):
            raise ValueError("dataset_type_id does not exist")
        return value

    @field_validator("databases")
    @classmethod
    def tables_exists(cls, value: base.Databases, info: ValidationInfo):
        """Validates that the sink table exists in the databases."""
        base_obj: base.FS2DB | None = info.data.get("base_obj")
        if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
            raise ValueError("id not in tables")
        return value

    @field_validator("connections")
    @classmethod
    def sink_connection_exists(cls, value: base.Connections, info: ValidationInfo):
        """Validates that the sink connection exists in the connections."""
        base_obj: base.FS2DB | None = info.data.get("base_obj")
        error_text = ""
        if (
            base_obj is not None
            and base_obj.sink_connection_id not in value.get_connections()
        ):
            error_text += "sink_connection_id "
        if (
            base_obj is not None
            and base_obj.source_connection_id not in value.get_connections()
        ):
            error_text += " source_connection_id "
        if len(error_text) > 1:
            raise ValueError(f"{error_text} not in connections")
        return value

    @field_validator("exec_sqls")
    @classmethod
    def exec_sql_exists(
        cls, value: dict[uuid.UUID, jobs.ExecSQL], info: ValidationInfo
    ):
        """Validates that the post-load exec job exists in exec SQLs."""
        base_obj: base.FS2DB | None = info.data.get("base_obj")
        if (
            base_obj is not None
            and base_obj.post_load_exec_job_id is not None
            and base_obj.post_load_exec_job_id not in value
        ):
            raise ValueError("post_load_exec_job_id not in ExecSQL jobs")
        return value

    @property
    def source_connection(self) -> base.Connection:
        """Returns the source connection object."""
        return self.connections.get_connections()[self.base_obj.source_connection_id]

    @property
    def sink_connection(self) -> base.Connection:
        """Returns the sink connection object."""
        return self.connections.get_connections()[self.base_obj.sink_connection_id]

    @property
    def sink_table(self) -> base.Table:
        """Returns the sink table object."""
        return self.databases.id_to_tables[self.base_obj.sink_table_id]

    @property
    def postload_execjob(self) -> jobs.ExecSQL | None:
        """Returns the post-load exec job if it exists."""
        if self.base_obj.post_load_exec_job_id is None:
            return None
        else:
            return self.exec_sqls[self.base_obj.post_load_exec_job_id]

    @property
    def dataset_type(self) -> base.DatasetType:
        """Returns the dataset type object."""
        return self.dataset_types.get_dataset_types()[self.base_obj.dataset_type_id]

    @property
    def rendered_filename_pattern(self) -> str:
        """
        Renders and returns the filename pattern using Jinja2 templates.

        The following variables can be used in the Jinja template:

        - `ds_type_name`: The name of the dataset type.
        - `ds_type_format`: The storage format of the dataset type.

        Example Jinja template:
        ```
        {{ ds_type_name }}_{{ ds_type_format }}_file
        ```

        Returns:
            str: The rendered filename pattern.
        """
        return env.from_string(self.base_obj.filename_pattern).render(
            ds_type_name=self.dataset_type.name,
            ds_type_format=self.dataset_type.storage_format,
        )

    @property
    def rendered_folder_path_pattern(self) -> str:
        """
        Renders and returns the folder path pattern using Jinja2 templates.

        The following variables can be used in the Jinja template:

        - `ds_type_name`: The name of the dataset type.
        - `ds_type_format`: The storage format of the dataset type.

        Example Jinja template:
        ```
        /data/{{ ds_type_name }}/{{ ds_type_format }}/
        ```

        Returns:
            str: The rendered folder path pattern.
        """
        return env.from_string(self.base_obj.folder_path_pattern).render(
            ds_type_name=self.dataset_type.name,
            ds_type_format=self.dataset_type.storage_format,
        )

dataset_type: base.DatasetType property

Returns the dataset type object.

postload_execjob: jobs.ExecSQL | None property

Returns the post-load exec job if it exists.

rendered_filename_pattern: str property

Renders and returns the filename pattern using Jinja2 templates.

The following variables can be used in the Jinja template:

  • ds_type_name: The name of the dataset type.
  • ds_type_format: The storage format of the dataset type.

Example Jinja template:

{{ ds_type_name }}_{{ ds_type_format }}_file

Returns:

Name Type Description
str str

The rendered filename pattern.

rendered_folder_path_pattern: str property

Renders and returns the folder path pattern using Jinja2 templates.

The following variables can be used in the Jinja template:

  • ds_type_name: The name of the dataset type.
  • ds_type_format: The storage format of the dataset type.

Example Jinja template:

/data/{{ ds_type_name }}/{{ ds_type_format }}/

Returns:

Name Type Description
str str

The rendered folder path pattern.

sink_connection: base.Connection property

Returns the sink connection object.

sink_table: base.Table property

Returns the sink table object.

source_connection: base.Connection property

Returns the source connection object.

dataset_type_exists(value, info) classmethod

Validates that the dataset type exists.

Source code in cloe_metadata/shared/jobs/fs2db.py
22
23
24
25
26
27
28
29
30
31
32
@field_validator("dataset_types")
@classmethod
def dataset_type_exists(cls, value: base.DatasetTypes, info: ValidationInfo):
    """Validates that the dataset type exists."""
    base_obj: base.FS2DB | None = info.data.get("base_obj")
    if (
        base_obj is not None
        and base_obj.dataset_type_id not in value.get_dataset_types()
    ):
        raise ValueError("dataset_type_id does not exist")
    return value

exec_sql_exists(value, info) classmethod

Validates that the post-load exec job exists in exec SQLs.

Source code in cloe_metadata/shared/jobs/fs2db.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@field_validator("exec_sqls")
@classmethod
def exec_sql_exists(
    cls, value: dict[uuid.UUID, jobs.ExecSQL], info: ValidationInfo
):
    """Validates that the post-load exec job exists in exec SQLs."""
    base_obj: base.FS2DB | None = info.data.get("base_obj")
    if (
        base_obj is not None
        and base_obj.post_load_exec_job_id is not None
        and base_obj.post_load_exec_job_id not in value
    ):
        raise ValueError("post_load_exec_job_id not in ExecSQL jobs")
    return value

sink_connection_exists(value, info) classmethod

Validates that the sink connection exists in the connections.

Source code in cloe_metadata/shared/jobs/fs2db.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@field_validator("connections")
@classmethod
def sink_connection_exists(cls, value: base.Connections, info: ValidationInfo):
    """Validates that the sink connection exists in the connections."""
    base_obj: base.FS2DB | None = info.data.get("base_obj")
    error_text = ""
    if (
        base_obj is not None
        and base_obj.sink_connection_id not in value.get_connections()
    ):
        error_text += "sink_connection_id "
    if (
        base_obj is not None
        and base_obj.source_connection_id not in value.get_connections()
    ):
        error_text += " source_connection_id "
    if len(error_text) > 1:
        raise ValueError(f"{error_text} not in connections")
    return value

tables_exists(value, info) classmethod

Validates that the sink table exists in the databases.

Source code in cloe_metadata/shared/jobs/fs2db.py
34
35
36
37
38
39
40
41
@field_validator("databases")
@classmethod
def tables_exists(cls, value: base.Databases, info: ValidationInfo):
    """Validates that the sink table exists in the databases."""
    base_obj: base.FS2DB | None = info.data.get("base_obj")
    if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
        raise ValueError("id not in tables")
    return value

Modeler

ColumnMapping

Bases: BaseModel

Class for advanced or shared ColumnMapping functionality.

Parameters:

Name Type Description Default
base_obj ColumnMapping
required
databases Databases
required
conversions ConversionTemplates
required
Source code in cloe_metadata/shared/modeler/dataflow/column_mapping.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class ColumnMapping(BaseModel):
    """Class for advanced or shared ColumnMapping functionality."""

    base_obj: column_mapping.ColumnMapping
    databases: base.Databases = Field(..., exclude=True)
    conversions: base.ConversionTemplates = Field(..., exclude=True)
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_validator("databases")
    @classmethod
    def sink_table_exists(cls, value: base.Databases, info: ValidationInfo):
        """
        Validates that the sink table exists in the databases.

        Args:
            value (base.Databases): The databases object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the sink_table_id does not exist in the databases.

        Returns:
            base.Databases: The validated databases object.
        """
        base_obj: column_mapping.ColumnMapping | None = info.data.get("base_obj")
        if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
            raise ValueError("sink_table_id does not exist")
        return value

    @field_validator("conversions")
    @classmethod
    def conversion_exists(cls, value: base.ConversionTemplates, info: ValidationInfo):
        """
        Validates that the conversion template exists in the conversion templates.

        Args:
            value (base.ConversionTemplates): The conversion templates object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the convert_to_datatype does not exist in the conversion templates.

        Returns:
            base.ConversionTemplates: The validated conversion templates object.
        """
        base_obj: column_mapping.ColumnMapping | None = info.data.get("base_obj")
        if (
            base_obj is not None
            and base_obj.convert_to_datatype is not None
            and base_obj.convert_to_datatype not in value.get_templates()
        ):
            raise ValueError("convert_to_datatype does not exist")
        return value

    @property
    def sink_table(self) -> base.Table | None:
        """
        Returns the sink table object.

        Returns:
            base.Table | None: The sink table object, or None if it does not exist.
        """
        return self.databases.id_to_tables[self.base_obj.sink_table_id]

    @property
    def sink_schema_table(self) -> tuple[base.Schema | None, base.Table | None]:
        """
        Returns the sink schema and table objects.

        Returns:
            tuple[base.Schema | None, base.Table | None]: The sink schema and table objects, or None if they do not exist.
        """
        return self.databases.get_table_and_schema(self.base_obj.sink_table_id)

sink_schema_table: tuple[base.Schema | None, base.Table | None] property

Returns the sink schema and table objects.

Returns:

Type Description
tuple[Schema | None, Table | None]

tuple[base.Schema | None, base.Table | None]: The sink schema and table objects, or None if they do not exist.

sink_table: base.Table | None property

Returns the sink table object.

Returns:

Type Description
Table | None

base.Table | None: The sink table object, or None if it does not exist.

conversion_exists(value, info) classmethod

Validates that the conversion template exists in the conversion templates.

Parameters:

Name Type Description Default
value ConversionTemplates

The conversion templates object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the convert_to_datatype does not exist in the conversion templates.

Returns:

Type Description

base.ConversionTemplates: The validated conversion templates object.

Source code in cloe_metadata/shared/modeler/dataflow/column_mapping.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@field_validator("conversions")
@classmethod
def conversion_exists(cls, value: base.ConversionTemplates, info: ValidationInfo):
    """
    Validates that the conversion template exists in the conversion templates.

    Args:
        value (base.ConversionTemplates): The conversion templates object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the convert_to_datatype does not exist in the conversion templates.

    Returns:
        base.ConversionTemplates: The validated conversion templates object.
    """
    base_obj: column_mapping.ColumnMapping | None = info.data.get("base_obj")
    if (
        base_obj is not None
        and base_obj.convert_to_datatype is not None
        and base_obj.convert_to_datatype not in value.get_templates()
    ):
        raise ValueError("convert_to_datatype does not exist")
    return value

sink_table_exists(value, info) classmethod

Validates that the sink table exists in the databases.

Parameters:

Name Type Description Default
value Databases

The databases object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the sink_table_id does not exist in the databases.

Returns:

Type Description

base.Databases: The validated databases object.

Source code in cloe_metadata/shared/modeler/dataflow/column_mapping.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@field_validator("databases")
@classmethod
def sink_table_exists(cls, value: base.Databases, info: ValidationInfo):
    """
    Validates that the sink table exists in the databases.

    Args:
        value (base.Databases): The databases object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the sink_table_id does not exist in the databases.

    Returns:
        base.Databases: The validated databases object.
    """
    base_obj: column_mapping.ColumnMapping | None = info.data.get("base_obj")
    if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
        raise ValueError("sink_table_id does not exist")
    return value

Dataflow

Bases: BaseModel

Class for advanced or shared Dataflow functionality.

Parameters:

Name Type Description Default
base_obj Dataflow
required
shared_lookups list[Lookup]
required
shared_source_tables list[SourceTable]
required
shared_column_mappings list[ColumnMapping]
required
databases Databases
required
sql_templates SQLTemplates
required
Source code in cloe_metadata/shared/modeler/dataflow/dataflow.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class Dataflow(BaseModel):
    """Class for advanced or shared Dataflow functionality."""

    base_obj: base.Dataflow
    shared_lookups: list[lookup.Lookup]
    shared_source_tables: list[source_table.SourceTable]
    shared_column_mappings: list[column_mapping.ColumnMapping]
    databases: base.Databases = Field(..., exclude=True)
    sql_templates: base.SQLTemplates = Field(..., exclude=True)
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_validator("databases")
    @classmethod
    def sink_table_exists(cls, value: base.Databases, info: ValidationInfo):
        """
        Validates that the sink table exists in the databases.

        Args:
            value (base.Databases): The databases object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the sink_table_id does not exist in the databases.

        Returns:
            base.Databases: The validated databases object.
        """
        base_obj: base.Dataflow | None = info.data.get("base_obj")
        if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
            raise ValueError("sink_table_id does not exist")
        return value

    @field_validator("sql_templates")
    @classmethod
    def sql_template_exists(cls, value: base.SQLTemplates, info: ValidationInfo):
        """
        Validates that the SQL template exists in the SQL templates.

        Args:
            value (base.SQLTemplates): The SQL templates object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the sql_template_id does not exist in the SQL templates.

        Returns:
            base.SQLTemplates: The validated SQL templates object.
        """
        base_obj: base.Dataflow | None = info.data.get("base_obj")
        if (
            base_obj is not None
            and base_obj.sql_template_id not in value.get_templates()
        ):
            raise ValueError("sql_template_id does not exist")
        return value

    @property
    def sink_table(self) -> base.Table | None:
        """
        Returns the sink table object.

        Returns:
            base.Table | None: The sink table object, or None if it does not exist.
        """
        return self.databases.id_to_tables[self.base_obj.sink_table_id]

    @property
    def sink_schema_table(self) -> tuple[base.Schema | None, base.Table | None]:
        """
        Returns the sink schema and table objects.

        Returns:
            tuple[base.Schema | None, base.Table | None]: The sink schema and table objects, or None if they do not exist.
        """
        return self.databases.get_table_and_schema(self.base_obj.sink_table_id)

    @property
    def sql_template(self) -> base.SQLTemplate | None:
        """
        Returns the SQL template object.

        Returns:
            base.SQLTemplate | None: The SQL template object, or None if it does not exist.
        """
        return self.sql_templates.get_templates().get(self.base_obj.sql_template_id)

sink_schema_table: tuple[base.Schema | None, base.Table | None] property

Returns the sink schema and table objects.

Returns:

Type Description
tuple[Schema | None, Table | None]

tuple[base.Schema | None, base.Table | None]: The sink schema and table objects, or None if they do not exist.

sink_table: base.Table | None property

Returns the sink table object.

Returns:

Type Description
Table | None

base.Table | None: The sink table object, or None if it does not exist.

sql_template: base.SQLTemplate | None property

Returns the SQL template object.

Returns:

Type Description
SQLTemplate | None

base.SQLTemplate | None: The SQL template object, or None if it does not exist.

sink_table_exists(value, info) classmethod

Validates that the sink table exists in the databases.

Parameters:

Name Type Description Default
value Databases

The databases object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the sink_table_id does not exist in the databases.

Returns:

Type Description

base.Databases: The validated databases object.

Source code in cloe_metadata/shared/modeler/dataflow/dataflow.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@field_validator("databases")
@classmethod
def sink_table_exists(cls, value: base.Databases, info: ValidationInfo):
    """
    Validates that the sink table exists in the databases.

    Args:
        value (base.Databases): The databases object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the sink_table_id does not exist in the databases.

    Returns:
        base.Databases: The validated databases object.
    """
    base_obj: base.Dataflow | None = info.data.get("base_obj")
    if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
        raise ValueError("sink_table_id does not exist")
    return value

sql_template_exists(value, info) classmethod

Validates that the SQL template exists in the SQL templates.

Parameters:

Name Type Description Default
value SQLTemplates

The SQL templates object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the sql_template_id does not exist in the SQL templates.

Returns:

Type Description

base.SQLTemplates: The validated SQL templates object.

Source code in cloe_metadata/shared/modeler/dataflow/dataflow.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@field_validator("sql_templates")
@classmethod
def sql_template_exists(cls, value: base.SQLTemplates, info: ValidationInfo):
    """
    Validates that the SQL template exists in the SQL templates.

    Args:
        value (base.SQLTemplates): The SQL templates object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the sql_template_id does not exist in the SQL templates.

    Returns:
        base.SQLTemplates: The validated SQL templates object.
    """
    base_obj: base.Dataflow | None = info.data.get("base_obj")
    if (
        base_obj is not None
        and base_obj.sql_template_id not in value.get_templates()
    ):
        raise ValueError("sql_template_id does not exist")
    return value

Lookup

Bases: BaseModel

Class for advanced or shared Lookup functionality.

Parameters:

Name Type Description Default
base_obj Lookup
required
shared_return_column_mapping list[ReturnColumnMapping]
required
databases Databases
required
Source code in cloe_metadata/shared/modeler/dataflow/lookup.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class Lookup(BaseModel):
    """Class for advanced or shared Lookup functionality."""

    base_obj: lookup.Lookup
    shared_return_column_mapping: list[ReturnColumnMapping]
    databases: base.Databases = Field(..., exclude=True)
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_validator("databases")
    @classmethod
    def tenant_exists(cls, value: base.Databases, info: ValidationInfo):
        """
        Validates that the lookup table exists in the databases.

        Args:
            value (base.Databases): The databases object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the lookup_table_id does not exist in the databases.

        Returns:
            base.Databases: The validated databases object.
        """
        base_obj: lookup.Lookup | None = info.data.get("base_obj")
        if base_obj is not None and base_obj.lookup_table_id not in value.id_to_tables:
            raise ValueError("lookup_table_id does not exist")
        return value

    @property
    def source_table(self) -> base.Table | None:
        """
        Returns the source table object.

        Returns:
            base.Table | None: The source table object, or None if it does not exist.
        """
        return self.databases.id_to_tables.get(self.base_obj.lookup_table_id)

    @property
    def source_schema_table(self) -> tuple[base.Schema | None, base.Table | None]:
        """
        Returns the source schema and table objects.

        Returns:
            tuple[base.Schema | None, base.Table | None]: The source schema and table objects, or None if they do not exist.
        """
        return self.databases.get_table_and_schema(self.base_obj.lookup_table_id)

source_schema_table: tuple[base.Schema | None, base.Table | None] property

Returns the source schema and table objects.

Returns:

Type Description
tuple[Schema | None, Table | None]

tuple[base.Schema | None, base.Table | None]: The source schema and table objects, or None if they do not exist.

source_table: base.Table | None property

Returns the source table object.

Returns:

Type Description
Table | None

base.Table | None: The source table object, or None if it does not exist.

tenant_exists(value, info) classmethod

Validates that the lookup table exists in the databases.

Parameters:

Name Type Description Default
value Databases

The databases object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the lookup_table_id does not exist in the databases.

Returns:

Type Description

base.Databases: The validated databases object.

Source code in cloe_metadata/shared/modeler/dataflow/lookup.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@field_validator("databases")
@classmethod
def tenant_exists(cls, value: base.Databases, info: ValidationInfo):
    """
    Validates that the lookup table exists in the databases.

    Args:
        value (base.Databases): The databases object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the lookup_table_id does not exist in the databases.

    Returns:
        base.Databases: The validated databases object.
    """
    base_obj: lookup.Lookup | None = info.data.get("base_obj")
    if base_obj is not None and base_obj.lookup_table_id not in value.id_to_tables:
        raise ValueError("lookup_table_id does not exist")
    return value

ReturnColumnMapping

Bases: BaseModel

Class for advanced or shared ReturnColumnMapping functionality.

Parameters:

Name Type Description Default
base_obj ReturnColumnMapping
required
databases Databases
required
Source code in cloe_metadata/shared/modeler/dataflow/lookup.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class ReturnColumnMapping(BaseModel):
    """Class for advanced or shared ReturnColumnMapping functionality."""

    base_obj: lookup.ReturnColumnMapping
    databases: base.Databases = Field(..., exclude=True)
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_validator("databases")
    @classmethod
    def tenant_exists(cls, value: base.Databases, info: ValidationInfo):
        """
        Validates that the sink table exists in the databases.

        Args:
            value (base.Databases): The databases object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the sink_table_id does not exist in the databases.

        Returns:
            base.Databases: The validated databases object.
        """
        base_obj: lookup.ReturnColumnMapping | None = info.data.get("base_obj")
        if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
            raise ValueError("sink_table_id does not exist")
        return value

    @property
    def sink_table(self) -> base.Table | None:
        """
        Returns the sink table object.

        Returns:
            base.Table | None: The sink table object, or None if it does not exist.
        """
        return self.databases.id_to_tables.get(self.base_obj.sink_table_id)

    @property
    def sink_schema_table(self) -> tuple[base.Schema | None, base.Table | None]:
        """
        Returns the sink schema and table objects.

        Returns:
            tuple[base.Schema | None, base.Table | None]: The sink schema and table objects, or None if they do not exist.
        """
        return self.databases.get_table_and_schema(self.base_obj.sink_table_id)

sink_schema_table: tuple[base.Schema | None, base.Table | None] property

Returns the sink schema and table objects.

Returns:

Type Description
tuple[Schema | None, Table | None]

tuple[base.Schema | None, base.Table | None]: The sink schema and table objects, or None if they do not exist.

sink_table: base.Table | None property

Returns the sink table object.

Returns:

Type Description
Table | None

base.Table | None: The sink table object, or None if it does not exist.

tenant_exists(value, info) classmethod

Validates that the sink table exists in the databases.

Parameters:

Name Type Description Default
value Databases

The databases object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the sink_table_id does not exist in the databases.

Returns:

Type Description

base.Databases: The validated databases object.

Source code in cloe_metadata/shared/modeler/dataflow/lookup.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@field_validator("databases")
@classmethod
def tenant_exists(cls, value: base.Databases, info: ValidationInfo):
    """
    Validates that the sink table exists in the databases.

    Args:
        value (base.Databases): The databases object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the sink_table_id does not exist in the databases.

    Returns:
        base.Databases: The validated databases object.
    """
    base_obj: lookup.ReturnColumnMapping | None = info.data.get("base_obj")
    if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
        raise ValueError("sink_table_id does not exist")
    return value

SourceTable

Bases: BaseModel

Class for advanced or shared SourceTable functionality.

Parameters:

Name Type Description Default
base_obj SourceTable
required
databases Databases
required
tenants Tenants
required
Source code in cloe_metadata/shared/modeler/dataflow/source_table.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class SourceTable(BaseModel):
    """Class for advanced or shared SourceTable functionality."""

    base_obj: source_table.SourceTable
    databases: base.Databases = Field(..., exclude=True)
    tenants: base.Tenants = Field(..., exclude=True)
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_validator("databases")
    @classmethod
    def table_exists(cls, value: base.Databases, info: ValidationInfo):
        """
        Validates that the source table exists in the databases.

        Args:
            value (base.Databases): The databases object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the table_id does not exist in the databases.

        Returns:
            base.Databases: The validated databases object.
        """
        base_obj: source_table.SourceTable | None = info.data.get("base_obj")
        if base_obj is not None and base_obj.table_id not in value.id_to_tables:
            raise ValueError("table_id does not exist")
        return value

    @field_validator("tenants")
    @classmethod
    def tenant_exists(cls, value: base.Tenants, info: ValidationInfo):
        """
        Validates that the tenant exists in the tenants.

        Args:
            value (base.Tenants): The tenants object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the tenant_id does not exist in the tenants.

        Returns:
            base.Tenants: The validated tenants object.
        """
        base_obj: source_table.SourceTable | None = info.data.get("base_obj")
        if (
            base_obj is not None
            and base_obj.tenant_id is not None
            and base_obj.tenant_id not in value.get_tenants()
        ):
            raise ValueError("tenant_id does not exist")
        return value

    @property
    def source_table(self) -> base.Table | None:
        """
        Returns the source table object.

        Returns:
            base.Table | None: The source table object, or None if it does not exist.
        """
        return self.databases.id_to_tables.get(self.base_obj.table_id)

    @property
    def source_schema_table(self) -> tuple[base.Schema | None, base.Table | None]:
        """
        Returns the source schema and table objects.

        Returns:
            tuple[base.Schema | None, base.Table | None]: The source schema and table objects, or None if they do not exist.
        """
        return self.databases.get_table_and_schema(self.base_obj.table_id)

    @property
    def tenant(self) -> base.Tenant | None:
        """
        Returns the tenant object.

        Returns:
            base.Tenant | None: The tenant object, or None if it does not exist.
        """
        if self.base_obj.tenant_id is not None:
            return self.tenants.get_tenants().get(self.base_obj.tenant_id)
        return None

source_schema_table: tuple[base.Schema | None, base.Table | None] property

Returns the source schema and table objects.

Returns:

Type Description
tuple[Schema | None, Table | None]

tuple[base.Schema | None, base.Table | None]: The source schema and table objects, or None if they do not exist.

source_table: base.Table | None property

Returns the source table object.

Returns:

Type Description
Table | None

base.Table | None: The source table object, or None if it does not exist.

tenant: base.Tenant | None property

Returns the tenant object.

Returns:

Type Description
Tenant | None

base.Tenant | None: The tenant object, or None if it does not exist.

table_exists(value, info) classmethod

Validates that the source table exists in the databases.

Parameters:

Name Type Description Default
value Databases

The databases object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the table_id does not exist in the databases.

Returns:

Type Description

base.Databases: The validated databases object.

Source code in cloe_metadata/shared/modeler/dataflow/source_table.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@field_validator("databases")
@classmethod
def table_exists(cls, value: base.Databases, info: ValidationInfo):
    """
    Validates that the source table exists in the databases.

    Args:
        value (base.Databases): The databases object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the table_id does not exist in the databases.

    Returns:
        base.Databases: The validated databases object.
    """
    base_obj: source_table.SourceTable | None = info.data.get("base_obj")
    if base_obj is not None and base_obj.table_id not in value.id_to_tables:
        raise ValueError("table_id does not exist")
    return value

tenant_exists(value, info) classmethod

Validates that the tenant exists in the tenants.

Parameters:

Name Type Description Default
value Tenants

The tenants object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the tenant_id does not exist in the tenants.

Returns:

Type Description

base.Tenants: The validated tenants object.

Source code in cloe_metadata/shared/modeler/dataflow/source_table.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@field_validator("tenants")
@classmethod
def tenant_exists(cls, value: base.Tenants, info: ValidationInfo):
    """
    Validates that the tenant exists in the tenants.

    Args:
        value (base.Tenants): The tenants object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the tenant_id does not exist in the tenants.

    Returns:
        base.Tenants: The validated tenants object.
    """
    base_obj: source_table.SourceTable | None = info.data.get("base_obj")
    if (
        base_obj is not None
        and base_obj.tenant_id is not None
        and base_obj.tenant_id not in value.get_tenants()
    ):
        raise ValueError("tenant_id does not exist")
    return value

CustomDataflow

Bases: BaseModel

Class for advanced or shared CustomDataflow functionality.

Parameters:

Name Type Description Default
base_obj CustomDataflow
required
shared_table_mappings list[TableMapping]
required
Source code in cloe_metadata/shared/modeler/custom_dataflow/custom_dataflow.py
11
12
13
14
15
class CustomDataflow(BaseModel):
    """Class for advanced or shared CustomDataflow functionality."""

    base_obj: base.CustomDataflow
    shared_table_mappings: list[table_mapping.TableMapping]

TableMapping

Bases: BaseModel

Class for advanced or shared TableMapping functionality.

Parameters:

Name Type Description Default
base_obj TableMapping
required
databases Databases
required
Source code in cloe_metadata/shared/modeler/custom_dataflow/table_mapping.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class TableMapping(BaseModel):
    """Class for advanced or shared TableMapping functionality."""

    base_obj: table_mapping.TableMapping
    databases: base.Databases = Field(..., exclude=True)
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_validator("databases")
    @classmethod
    def sink_table_exists(cls, value: base.Databases, info: ValidationInfo):
        """
        Validates that the sink and source tables exist in the databases.

        Args:
            value (base.Databases): The databases object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If either the sink_table_id or source_table_id do not exist in the databases.

        Returns:
            base.Databases: The validated databases object.
        """
        base_obj: table_mapping.TableMapping | None = info.data.get("base_obj")
        error_text = ""
        if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
            error_text += "sink_table_id "
        if base_obj is not None and base_obj.source_table_id not in value.id_to_tables:
            error_text += " source_table_id "
        if len(error_text) > 1:
            raise ValueError(f"{error_text} not in databases")
        return value

    @property
    def sink_table(self) -> base.Table | None:
        """
        Returns the sink table object.

        Returns:
            base.Table | None: The sink table object, or None if it does not exist.
        """
        return self.databases.id_to_tables[self.base_obj.sink_table_id]

    @property
    def sink_schema_table(self) -> tuple[base.Schema | None, base.Table | None]:
        """
        Returns the sink schema and table objects.

        Returns:
            tuple[base.Schema | None, base.Table | None]: The sink schema and table objects, or None if they do not exist.
        """
        return self.databases.get_table_and_schema(self.base_obj.sink_table_id)

    @property
    def source_table(self) -> base.Table | None:
        """
        Returns the source table object.

        Returns:
            base.Table | None: The source table object, or None if it does not exist.
        """
        return self.databases.id_to_tables[self.base_obj.source_table_id]

    @property
    def source_schema_table(self) -> tuple[base.Schema | None, base.Table | None]:
        """
        Returns the source schema and table objects.

        Returns:
            tuple[base.Schema | None, base.Table | None]: The source schema and table objects, or None if they do not exist.
        """
        return self.databases.get_table_and_schema(self.base_obj.source_table_id)

sink_schema_table: tuple[base.Schema | None, base.Table | None] property

Returns the sink schema and table objects.

Returns:

Type Description
tuple[Schema | None, Table | None]

tuple[base.Schema | None, base.Table | None]: The sink schema and table objects, or None if they do not exist.

sink_table: base.Table | None property

Returns the sink table object.

Returns:

Type Description
Table | None

base.Table | None: The sink table object, or None if it does not exist.

source_schema_table: tuple[base.Schema | None, base.Table | None] property

Returns the source schema and table objects.

Returns:

Type Description
tuple[Schema | None, Table | None]

tuple[base.Schema | None, base.Table | None]: The source schema and table objects, or None if they do not exist.

source_table: base.Table | None property

Returns the source table object.

Returns:

Type Description
Table | None

base.Table | None: The source table object, or None if it does not exist.

sink_table_exists(value, info) classmethod

Validates that the sink and source tables exist in the databases.

Parameters:

Name Type Description Default
value Databases

The databases object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If either the sink_table_id or source_table_id do not exist in the databases.

Returns:

Type Description

base.Databases: The validated databases object.

Source code in cloe_metadata/shared/modeler/custom_dataflow/table_mapping.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@field_validator("databases")
@classmethod
def sink_table_exists(cls, value: base.Databases, info: ValidationInfo):
    """
    Validates that the sink and source tables exist in the databases.

    Args:
        value (base.Databases): The databases object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If either the sink_table_id or source_table_id do not exist in the databases.

    Returns:
        base.Databases: The validated databases object.
    """
    base_obj: table_mapping.TableMapping | None = info.data.get("base_obj")
    error_text = ""
    if base_obj is not None and base_obj.sink_table_id not in value.id_to_tables:
        error_text += "sink_table_id "
    if base_obj is not None and base_obj.source_table_id not in value.id_to_tables:
        error_text += " source_table_id "
    if len(error_text) > 1:
        raise ValueError(f"{error_text} not in databases")
    return value

Repository

DataSourceInfo

Bases: BaseModel

Class for advanced or shared DataSourceInfo functionality.

Parameters:

Name Type Description Default
base_obj DataSourceInfo
required
sourcesystems Sourcesystems
required
tenants Tenants
required
Source code in cloe_metadata/shared/repository/data_source_info.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class DataSourceInfo(BaseModel):
    """Class for advanced or shared DataSourceInfo functionality."""

    base_obj: base.DataSourceInfo
    sourcesystems: base.Sourcesystems = Field(..., exclude=True)
    tenants: base.Tenants = Field(..., exclude=True)
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_validator("sourcesystems")
    @classmethod
    def sourcesystem_exists(cls, value: base.Sourcesystems, info: ValidationInfo):
        """
        Validates that the source system exists in the source systems.

        Args:
            value (base.Sourcesystems): The source systems object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the sourcesystem_id does not exist in the source systems.

        Returns:
            base.Sourcesystems: The validated source systems object.
        """
        base_obj: base.DataSourceInfo | None = info.data.get("base_obj")
        if (
            base_obj is not None
            and base_obj.sourcesystem_id not in value.get_sourcesystems()
        ):
            raise ValueError("sourcesystem does not exist")
        return value

    @field_validator("tenants")
    @classmethod
    def tenant_exists(cls, value: base.Tenants, info: ValidationInfo):
        """
        Validates that the tenant exists in the tenants.

        Args:
            value (base.Tenants): The tenants object to validate.
            info (ValidationInfo): Information about the field being validated.

        Raises:
            ValueError: If the tenant_id does not exist in the tenants.

        Returns:
            base.Tenants: The validated tenants object.
        """
        base_obj: base.DataSourceInfo | None = info.data.get("base_obj")
        if (
            base_obj is not None
            and base_obj.tenant_id is not None
            and base_obj.tenant_id not in value.get_tenants()
        ):
            raise ValueError("tenant does not exist")
        return value

    @property
    def tenant(self) -> base.Tenant | None:
        """
        Returns the tenant object.

        Returns:
            base.Tenant | None: The tenant object, or None if it does not exist.
        """
        if self.base_obj.tenant_id is not None:
            return self.tenants.get_tenants().get(self.base_obj.tenant_id)
        return None

    @property
    def sourcesystem(self) -> base.Sourcesystem:
        """
        Returns the source system object.

        Returns:
            base.Sourcesystem: The source system object.
        """
        return self.sourcesystems.get_sourcesystems()[self.base_obj.sourcesystem_id]

sourcesystem: base.Sourcesystem property

Returns the source system object.

Returns:

Type Description
Sourcesystem

base.Sourcesystem: The source system object.

tenant: base.Tenant | None property

Returns the tenant object.

Returns:

Type Description
Tenant | None

base.Tenant | None: The tenant object, or None if it does not exist.

sourcesystem_exists(value, info) classmethod

Validates that the source system exists in the source systems.

Parameters:

Name Type Description Default
value Sourcesystems

The source systems object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the sourcesystem_id does not exist in the source systems.

Returns:

Type Description

base.Sourcesystems: The validated source systems object.

Source code in cloe_metadata/shared/repository/data_source_info.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@field_validator("sourcesystems")
@classmethod
def sourcesystem_exists(cls, value: base.Sourcesystems, info: ValidationInfo):
    """
    Validates that the source system exists in the source systems.

    Args:
        value (base.Sourcesystems): The source systems object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the sourcesystem_id does not exist in the source systems.

    Returns:
        base.Sourcesystems: The validated source systems object.
    """
    base_obj: base.DataSourceInfo | None = info.data.get("base_obj")
    if (
        base_obj is not None
        and base_obj.sourcesystem_id not in value.get_sourcesystems()
    ):
        raise ValueError("sourcesystem does not exist")
    return value

tenant_exists(value, info) classmethod

Validates that the tenant exists in the tenants.

Parameters:

Name Type Description Default
value Tenants

The tenants object to validate.

required
info ValidationInfo

Information about the field being validated.

required

Raises:

Type Description
ValueError

If the tenant_id does not exist in the tenants.

Returns:

Type Description

base.Tenants: The validated tenants object.

Source code in cloe_metadata/shared/repository/data_source_info.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@field_validator("tenants")
@classmethod
def tenant_exists(cls, value: base.Tenants, info: ValidationInfo):
    """
    Validates that the tenant exists in the tenants.

    Args:
        value (base.Tenants): The tenants object to validate.
        info (ValidationInfo): Information about the field being validated.

    Raises:
        ValueError: If the tenant_id does not exist in the tenants.

    Returns:
        base.Tenants: The validated tenants object.
    """
    base_obj: base.DataSourceInfo | None = info.data.get("base_obj")
    if (
        base_obj is not None
        and base_obj.tenant_id is not None
        and base_obj.tenant_id not in value.get_tenants()
    ):
        raise ValueError("tenant does not exist")
    return value