dwh-service

1.0.3 • Public • Published

dwh-service

Содержание

Описание

dwh-service является монорепой для ELT-скриптов обработки данных и включает в себя:

ПО Версия Описание
Apache Airflow 2.3.3 Оркестрация ELT-операций
dbt 1.2.0 (T)ransform-операции в SQL-нотации
Spark, PySpark 3.3.0 (E)xtract- и (L)oad-операции
ADB 6.21.1 Хранилище данных
ADQM 22.3.7 Витрины данных

Дополнительно в состав проекта включены конфиги docker для развёртывания локального окружения, эмулирующего целевое. Подробную информацию о локальном стенде, разворачиваемом из docker-образов, см. в описании.

Структура проекта

dwh-service
├───airflow
├───dbt
├───docker
├───spark
├───sql_scripts
└───README.md

Сквозной процесс работы с одной сущностью в проекте

Для примера возьмём таблицу statement.decision в БД ТСР.

Необходимо загрузить таблицу в БД ХОАД, сделать необходимые преобразования, протащить её до схемы dwh_core и сделать витрину, зависящую от этой таблицы.

Если таблица уже добавлена в конфиг и нужно только обновить в ней данные, то достаточно выполнить только пункты 4 и 5.

1. Добавление/обновление метаданных таблицы в конфигурационной БД

Инструмент - локальная консоль (python).

Используем скрипт spark_development/get_db_structure.py. Вызываем его локально следующим образом:

python get_db_structure.py --db "ид БД ТСР в dwh_stg_config.load_data_config_sources" --sch "схема для загрузки (statement)" --meta_db "DEV или TEST"

ВАЖНО! Если БД источник - HBase, то для корректной работы необходимо установить Java (JDK), а также положить драйвер для подключения к HBase (phoenix-5.0.0-HBase-2.0-thin-client.jar) в папку spark/packages (предварительно папку надо создать)

скрипт работает только на python 3.10. Если установлена более ранняя версия python, то можно попросить загрузить кого-нибудь из коллег, у кого установлен python 3.10

В результате выполнения скрипта в таблице dwh_stg_config.source_db_structure добавятся/обновятся данные о всех таблицах в схеме statement в БД ТСР.

2. Добавление таблицы в конфигурационную таблицу

Инструмент - средство для работы с БД (например, DBeaver) с подключением к конфигурационной БД.

Используемые функции:

  1. dwh_stg_config.get_config_from_source - просмотр конфигурационных данных
  2. dwh_stg_config.insert_jdbc_config_from_source - добавление конфигурации JDBC
  3. dwh_stg_config.update_jdbc_config_from_source - обновление существующей конфигурации JDBC
  4. dwh_stg_config.insert_kafka_config_from_source - добавление новых агрегатов Kafka
  5. dwh_stg_config.update_kafka_config_from_source - обновление агрегатов Kafka
  6. dwh_stg_config.insert_full_config_from_source - добавление конфигурации JDBC и агрегата Kafka вместе
  7. dwh_stg_config.update_full_config_from_source - обновление конфигурации JDBC и добавление/обновление агрегата Kafka вместе

Для работы в данном случае нужны только insert_full_config_from_source и update_full_config_from_source. Остальные функции используются внутри этих двух.

Примеры соответствующих вызовов функций:

SELECT *
FROM dwh_stg_config.source_db_structure str
cross JOIN json_to_record (
    dwh_stg_config.get_config_from_source(
        _db_id := str.source_db,
        _schema_name := str.schema_name,
        _table_name := str.table_name,
        _aggregate_name := null,
        _result_table_name := null
        )
) as (
    app_name                text,
    kafka_config_id         int,
    load_mode               text,
    aggregate_name          text,
    source_from_id          int,
    schema_from_name        text,
    table_from_name         text,
    select_text             text,
    filter_col              text,
    default_filter_val      text,
    source_to_id            int,
    src_jdbc_table_name     text,
    src_kafka_table_name    text,
    json_fields             text,
    table_columns_json      text,
    dbt_generate_jdbc_stg   text,
    dbt_generate_kafka_stg  text,
    dbt_generate_kafka_test text,
    dbt_generate_core       text
)
WHERE str.table_name = 'decision'
  and str.schema_name = 'statement'
  and str.source_db = 8
do
$$                    
declare
    _config_id int;
begin
    _config_id := dwh_stg_config.insert_jdbc_config_from_source(
                        _db_id 				:= 8,
                        _schema_name 		:= 'statement',
                        _table_name 		:= 'decision',
                        _aggregate_name     := null,
                        _result_table_name 	:= 'statement__decision');

    raise notice '%', _config_id::text;

end;
$$;
do
$$                    
declare
    _config_id int;
    _table_name text;
    _aggregate_name text;
begin
    select j.config_id,
            case
                when position( '_jdbc__' in j.table_name_to) = 0
                then null
                else substring(j.table_name_to from position( '_jdbc__' in j.table_name_to)+7)
            end as table_name,
            a.aggregate_name
    into _config_id, _table_name, _aggregate_name
    from dwh_stg_config.load_data_config_jdbc j
    left join dwh_stg_config.load_kafka_aggregate_tables a
        on j.table_name_to = a.jdbc_table_name
        and a.active_record = 1
    where j.config_id = 3533; --ид обновляемой записи


    _config_id := dwh_stg_config.update_jdbc_config_from_source(
                        _config_id 			:= _config_id,
                        _aggregate_name     := _aggregate_name,
                        _result_table_name 	:= _table_name
    );
    raise notice '%', _config_id::text;

end;
$$;	 
do
$$                    
declare
    _aggregate_table_id int;
begin
    _aggregate_table_id := dwh_stg_config.insert_kafka_config_from_source(
                        _config_id_jdbc := 1600,
                        _aggregate_name := 'this.is.aggregate.name');

    raise notice '%', _aggregate_table_id::text;

end;
$$;
do
$$                    
declare
    _aggregate_table_id int;
begin
    _aggregate_table_id := dwh_stg_config.update_kafka_config_from_source(
                        _aggregate_table_id := 1600,
                        _jdbc_config_id := null,
                        _aggregate_name := null,
                        _result_table_name := null
    );

    raise notice '%', _aggregate_table_id::text;

end;
$$;
do
$$                    
declare
    res text;
begin
    res := dwh_stg_config.insert_full_config_from_source(
                        _db_id 				:= 8,
                        _schema_name 		:= 'statement',
                        _table_name 		:= 'person',
                        _aggregate_name     := 'ru.gov.pfr.ecp.apso.tsr.statement.pojo.Person',
                        _result_table_name 	:= 'stm__pers__tt');

    raise notice '%', res;

end;
$$;
do
$$                    
declare
    res text;
begin
    res := dwh_stg_config.update_full_config_from_source(
                        _config_id 			:= 3627,
                        _aggregate_name 	:= null,
                        _result_table_name 	:= null
    );

    raise notice '%', res;

end;
$$;  

Для каждой таблицы очень желательно сразу указать агрегат (атрибут _aggregate_name в функциях).

  1. Если таблица НЕ НСИ, то можно воспользоваться запросом в БД Greenplum вида:
SELECT "aggregateName", count(*)
FROM dwh_src.src_kafka__iw_multipart
where lower("aggregateName") like '%tsr%'
group by "aggregateName"
order by 1, 2

Если в entityName или aggregateName есть соответствующий агрегат, то необходимо привязать его к созданному конфигу JDBC. Если есть сомнения по поводу корректности агрегата, можно проверить набор его полей запросами:

select value_json
from dwh_src.src_kafka__iw_multipart
where "aggregateName" = 'ru.gov.pfr.ecp.rs.rs_fl_ip.pojo.GpDogovor'

Набор полей должен соотноситься с набором полей в таблице-источнике.

  1. Если таблица НСИ, то название агрегата = названию таблицы в CamelCase. Название можно найти, выполнив в БД НСИ запрос вида:
select * 
from wso.dict
where lower(name) like '%table%'

Для версионных справочников НСИ, в которых имеются несколько записей по одному code с различными периодами действия startdate / enddate. При добавлении/обновлении подобных справочников при запуске функций dwh_stg_config.update_full_config_from_source или dwh_stg_config.insert_full_config_from_source необходимо передавать параметр _id_field со значением 'code'. В остальных случаях передавать параметр не нужно. Пример соответствующих вызовов функций:

do
$$                    
declare
    res text;
begin
    res := dwh_stg_config.insert_full_config_from_source(
                        _db_id 				:= 3,
                        _schema_name 		:= 'wso',
                        _table_name 		:= 'dictofficemse',
                        _aggregate_name     := 'DictOfficeMse',
                        _result_table_name 	:= 'dictofficemse',
                        _id_field           := 'code');

    raise notice '%', res;

end;
$$;
do
$$                    
declare
    res text;
begin
    res := dwh_stg_config.update_full_config_from_source(
                        _config_id 			:= 3580,
                        _aggregate_name 	:= 'DictOfficeMse',
                        _result_table_name 	:= 'dictofficemse',
                        _id_field           := 'code'
    );

    raise notice '%', res;

end;
$$;  

После добавления/обновления конфига необходимо проверить корректность данных в dwh_stg_config.load_data_config_jdbc и dwh_stg_config.load_kafka_aggregate_tables.

ВАЖНО! По умолчанию конфиг добавляется с флагом active = 0. Для того, чтобы он попал в загрузку, необходимо изменить флаг на 1.

3. Создание модели в dbt для транформации в stg и core

Инструменты:

  • средство для работы с БД (например, DBeaver) с подключением к конфигурационной БД для просмотра добавленых ранее конфигов
  • локальная консоль (python) для выполнения скриптов
  • средство просмотра файлов в файловой системе (проводник, VSCode)

Если агрегат был добавлен, то в dwh_src необходимо создать также таблицу с данными kafka. Есть два варианта сделать это:

  1. Для НЕ НСИ: Запустить DAG HOAD_..._KAFKA_load_config_iw_system с параметрами {"offsets": "{'0': -2}", "aggregate_name": "aggr.name"} - указать название добавляемого агрегата (не забыть перед этим запустить DAG HOAD_..._update_config_file).
  2. Для НСИ, либо если для НЕ НСИ по первому варианту таблица не создалась: использовать запрос sql_scripts/old/114_select_to_create_kafka_table.sql. Запрос необходиимо выполнить в конфигурационной БД. В последнем столбце будет SQL-код для создания таблицы. Его необходимо выполнить в БД Greenplum.

На предыдущем шаге в конфигурационную таблицу в том числе добавляются вызовы скриптов генерации моделей dbt.

  1. Модель jdbc stg.

Для создания модели открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_data_config_jdbc.dbt_to_stg. В результате выполнения команды в папке dbt/models/stg/jdbc/generated_views будет создана необходимая модель.

  1. Модель kafka stg.

Если агрегат неизвестен и не был указан в конфигурации, то этот шаг пропускается.

Для создания модели открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_kafka_aggregate_tables.dbt_to_stg. В результате выполнения команды в папке dbt/models/stg/kafka/generated_views будет создана необходимая модель.

2.1. Тест данных kafka.

Если агрегат неизвестен и не был указан в конфигурации, то этот шаг пропускается.

Для создания теста открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_kafka_aggregate_tables.dbt_test. В результате выполнения команды в папке dbt/tests/stg/kafka/generated_tests будет создан необходимый тест.

Описание генератора тестов находится в dbt/tests/stg/README.md.

  1. Модель core.

Для создания модели открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_data_config_jdbc.dbt_to_core. В результате выполнения команды в папке dbt/models/core/generated_tables будет создана необходимая модель.

  1. Проверить созданные модели и тесты и, если всё ок, перенести их в соответствующие папки модуля, к которому они относятся.

4.1. В случае, если в модели необходимо создать индексы на определенные поля, которые могут потребоваться в витринах при соединении таблиц или в фильтрах, необходимо дополнить поле post_hook следующей конструкцией - {{create_index(<schema_name>, <table_name>, <column_name>, <_number>)}}, где

  • <schema_name> - название схемы, в которой распологается модель
  • <table_name> - название модели
  • <column_name> - название индексируемой колонки(можно создать составной индекс, указав колонки через запятую - 'column1,column2,column3', при этом пробелы использовать нельзя, т.к. эта переменная так же включается в название)
  • <_number> - порядковый номер индекса, начинается с 0 (по канону)

Создаются индексы только типа B-дерево

При использовании нескольких макросов в post_hook необходимо разделять их точкой с запятой (;) - post_hook: "{{macros1()}} ; {{macros2()}}"

ВАЖНО! Созданные модели являются только шаблонами, при необходимости в них можно вносить изменения.

4. Загрузка таблицы JDBC

Инструменты - Airflow

  1. Запустить DAG HOAD_<среда>_update_config_file для загрузки актуальной конфигурации из БД в конфигурационный файл dag_utils/config_file_data.py
  2. DAG'и в настоящий момент распределяются по модулям. Например, в DAG HOAD_<среда>_JDBC_load_TSR попадают конфиги, app_name которых начинается на "tsr". Если таблица относится к модулю, для которого уже создан DAG, то необходимо открыть этот DAG и посмотреть, что соответствующие таски с указанной таблицей появились. Если такого DAG нет, его необходимо создать по аналогии с ранее созданными DAG.
  3. Запустить выбранный DAG, либо дождаться его выполнения, если для него установлено расписание, проверить результат его выполнения.
  4. Ожидаемый результат DAG в целевой БД:
    1. Должна создаться/обновиться таблица, указанная в dwh_stg_config.load_data_config_jdbc.table_name_to
    2. Должны создаться/обновиться объекты, созданные в dbt (вью в схеме dwh_stg, таблица в схеме dwh_core). ВАЖНО! На данном этапе п.2 не выполнится, т.к. модели dbt ещё не загружены на стенд Airflow. Для создания объектов в БД необходимо локально запустить dbt командой dbt run --select tag:название_тега_сущности

5. Загрузка таблицы Kafka

Инструменты - Airflow

  1. Запустить DAG HOAD_<среда>_update_config_file для загрузки актуальной конфигурации из БД в конфигурационный файл dag_utils/config_file_data.py.
  2. Если при создании конфигурации был указан агрегат в kafka, то изменения по нему загружаются в рамках DAG HOAD_<среда>_KAFKA_load_config_iw_system.
  3. Запустить указанный DAG, либо дождаться его выполнения, если для него установлено расписание, проверить результат его выполнения.
  4. Ожидаемый результат DAG в целевой БД:
    1. Должна создаться/обновиться таблица, указанная в dwh_stg_config.load_kafka_aggregate_tables.kafka_table_name.
    2. Должны создаться/обновиться объекты, созданные в dbt (вью в схеме dwh_stg, таблица в схеме dwh_core) ВАЖНО! На данном этапе п.2 не выполнится, т.к. модели dbt ещё не загружены на стенд Airflow. Для создания объектов в БД необходимо локально запустить dbt командой dbt run --select tag:название_тега_сущности

6. Создание витрин

Инструменты:

  • средство для работы с БД (например, DBeaver) с подключением к целевой БД
  • локальный редактор файлов (Notepad++, VSCode)

Необходимый README создан в папке dbt/models/marts.

  1. Написать необходимые запросы для формирования витрины, проверить.
  2. Создать модель в dbt в папке dbt/models/marts по аналогии с ранее созданными витринами.
  3. Указать описание витрины в файле dbt/models/marts/dwh_marts.yml

Package Sidebar

Install

npm i dwh-service

Weekly Downloads

2

Version

1.0.3

License

MIT

Unpacked Size

24.4 kB

Total Files

2

Last publish

Collaborators

  • vgeneralov