Skip to content

Database

The following entities must be deployed in order for CLOE to work correctly. In general technical entities like the FileCatalog, file formats, external/internal stages, stored procedure either internal or as output of Exec SQL jobs are assumed to reside in the cloe_dwh schema. The cloe_dwh schema needs to be accsessible via the CLOE technical users default database.

The cloe_dwh schema or why CLOE fails to find stored procedures

CLOE does not use absolute reference notation when executing stored procedures either as the output of an Exec SQL job or as an internal CLOE procedure. Instead it simply executes:

CALL cloe_dwh.merge_kundendaten();

or

INSERT INTO cloe_dwh.filecatalog ([...]) VALUES ([...]);

If the cloe_dwh schema is not placed in the database that is defined for the CLOE technical user as default or defined in the connection string, CLOE will not find it. Same is true for the FileCatalog.

All stage tables, ods and/or core tables must also exist once the ETL process starts.

Tables

CLOE stores metadata about files already processed or loaded in special table called FileCatalog. This needs to be deployed in the cloe_dwh schema and accessible by the CLOE ETL users.

DDL FileCatalog
    CREATE TABLE cloe_dwh.FileCatalog
    (
        "ID" NUMBER IDENTITY,
        "lock_uuid" VARCHAR,
        "FilePath" VARCHAR,
        "FileName" VARCHAR,
        "FileParts" NUMBER,
        "FileStatus" NUMBER DEFAULT 0,
        "DatasourceInfo_ID" VARCHAR,
        "DatasetType_ID" VARCHAR,
        "CN_FileStorage_ID" VARCHAR,
        "SequenceColumnName" VARCHAR,
        "LastSequence" VARCHAR,
        "Message" VARCHAR,
        "_DCR" TIMESTAMP_LTZ DEFAULT current_timestamp(),
        "_DMR" TIMESTAMP_TZ NULL
    );
DDL FileCatalog
    CREATE TABLE [cloe_dwh].[FileCatalog]
    (
        [ID] INT NOT NULL IDENTITY(1,1),
        [FilePath] NVARCHAR(255) NULL,
        [FileName] NVARCHAR(255) NULL,
        [FileStatus] TINYINT NULL DEFAULT 0,
        [DatasourceInfo_ID] NVARCHAR(255) NULL,
        [DatasetType_ID] NVARCHAR(255) NULL,
        [CN_FileStorage_ID] NVARCHAR(255) NULL,
        [SequenceColumnName] NVARCHAR(255) NULL,
        [LastSequence] NVARCHAR(255) NULL,
        [lock_uuid] NVARCHAR(255) NULL,
        [message] NVARCHAR(MAX) NULL,
        [_DCR] DATETIME NOT NULL DEFAULT GETDATE(),
        [_DMR] DATETIME NULL
    )

Internal functions

The following functions must be created and accessible by the CLOE ETL user. If Snowflake is used as a DWH the external/internal stage in the cloe_dwh schema must be accessible by the CLOE ETL user.

CLOE needs this function to retrieve files directly from storage. This is the case when CLOE is not doing the extraction and only processing files provided to it by external tools.

DDL Retrieving files from stage procedure
    CREATE OR REPLACE PROCEDURE cloe_dwh.retrieve_files_from_blob(file_filter varchar)
    RETURNS varchar
    LANGUAGE JAVASCRIPT
    EXECUTE AS CALLER
    AS
    $$
        var sql_command_1 = `LIST @cloe_dwh.{{ STAGE_NAME }} pattern = '` + FILE_FILTER + `'`;
        snowflake.execute( {sqlText: sql_command_1} );
        var sql_command_2 = `
        with filepath_prepare (filepath, last_modified) as (
            SELECT array_to_string(array_slice(split("name", '/'), 4, array_size(split("name", '/'))), '/') AS filepath, "last_modified"
            FROM table(RESULT_SCAN(LAST_QUERY_ID()))
        ), file_list (retrieved_files) as (
        select listagg(filepath, ',') AS files
        from filepath_prepare
        where filepath not in (select concat(coalesce(concat(filepath, '/'), '/'), filename) from cloe_dwh.FileCatalog)
        order by last_modified
        )
        select retrieved_files
        from file_list
        where len(retrieved_files) > 1

        `;
        var result = snowflake.execute( {sqlText: sql_command_2} );
        result.next()
        return result.getColumnValue(1)
    $$
    ;
-- Planned

This function is needed to to retrieve files from the FileCatalog. Function is always needed.

-- Not needed
DDL Retrieving files from FileCatalog procedure
    CREATE PROCEDURE [cloe_dwh].[spGetFilesFromFileCatalog]
        @pFilepathPattern NVARCHAR(255),
        @pFilenamePattern NVARCHAR(255)

    AS
    BEGIN
        DECLARE @lock_id NVARCHAR(255)
        SET @lock_id = NEWID()
        UPDATE [cloe_dwh].[FileCatalog]
        SET [FileStatus] = 1, lock_uuid = @lock_id
        WHERE [FilePath] LIKE @pFilepathPattern AND [FileName] LIKE @pFilenamePattern

        SELECT
            TOP 1
            [ID],
            [FilePath],
            [FileName]
        FROM [cloe_dwh].[FileCatalog]
        WHERE FileStatus = 1 AND lock_uuid = @lock_id
        ORDER BY [ID]
    END

Finally the last function is needed for CLOE to able to update files status once they were processed.

DDL Update FileCatalog procedure
    CREATE OR REPLACE PROCEDURE cloe_dwh.update_file_catalog(file_path varchar, message varchar)
    RETURNS varchar
    LANGUAGE JAVASCRIPT
    EXECUTE AS CALLER
    AS
    $$
        splitted_path = FILE_PATH.split('/')
        if (splitted_path.length > 1 && splitted_path.slice(splitted_path.length -2).length > 1){
        file_path = "'" + splitted_path.slice(0, splitted_path.length -1).join('/') + "'";
        } else {
        file_path = 'NULL';
        }
        if (MESSAGE.length > 1 ){
        message = "'" + MESSAGE + "'";
        file_status = 3;
        } else {
        message = 'NULL';
        file_status = 2;
        }
    var sql_command_1 = `
    INSERT INTO cloe_dwh.FileCatalog
        (FilePath, FileName, FileStatus, FileParts, DatasourceInfo_ID, DatasetType_ID, CN_FileStorage_ID, SequenceColumnName, LastSequence, Message)
        VALUES (` + file_path + `, '` + splitted_path.slice(-1) + `', ` + file_status + `, 1, NULL, NULL, NULL, NULL, NULL, ` + message + `)`;
    snowflake.execute( {sqlText: sql_command_1} );
        return "true"
    $$
    ;
DDL Update FileCatalog procedure
    CREATE PROCEDURE [cloe_dwh].[spUpdateFileCatalog]
        @pFilecatalogID INT,
        @pFileLoadFailed BIT
    AS
    BEGIN
        SET NOCOUNT ON
        IF @pFileLoadFailed = 1
        BEGIN
            UPDATE [cloe_dwh].[FileCatalog]
            SET [FileStatus] = 3
            WHERE ID = @pFilecatalogID
        END
        ELSE
        BEGIN
            UPDATE [cloe_dwh].[FileCatalog]
            SET [FileStatus] = 2
            WHERE ID = @pFilecatalogID
        END
    END

Privileges Snowflake

Tip

The following paragraphs are mostly directed towards experienced users in Snowflake. If you are unsure how to create stages, work with file formats and how the snowflake privilege concept works, feel free to reach out to the Cloud-DWH Team.

CLOE needs read/write access to all tables that are part of the ETL process. Besides it needs usage privileges for procedures and lastly access to all stages it is going to retrieve data from. This guide is preliminary and will be updated to be properly aligned with the role concept.