dwh-service
является монорепой для ELT-скриптов обработки данных и включает в себя:
ПО | Версия | Описание |
---|---|---|
Apache Airflow | 2.3.3 | Оркестрация ELT-операций |
dbt | 1.2.0 | (T)ransform-операции в SQL-нотации |
Spark, PySpark | 3.3.0 | (E)xtract- и (L)oad-операции |
ADB | 6.21.1 | Хранилище данных |
ADQM | 22.3.7 | Витрины данных |
Дополнительно в состав проекта включены конфиги docker
для развёртывания локального окружения, эмулирующего целевое. Подробную информацию о локальном стенде, разворачиваемом из docker
-образов, см. в описании.
dwh-service
├───airflow
├───dbt
├───docker
├───spark
├───sql_scripts
└───README.md
Для примера возьмём таблицу statement.decision в БД ТСР.
Необходимо загрузить таблицу в БД ХОАД, сделать необходимые преобразования, протащить её до схемы dwh_core и сделать витрину, зависящую от этой таблицы.
Если таблица уже добавлена в конфиг и нужно только обновить в ней данные, то достаточно выполнить только пункты 4 и 5.
Инструмент - локальная консоль (python
).
Используем скрипт spark_development/get_db_structure.py
. Вызываем его локально следующим образом:
python get_db_structure.py --db "ид БД ТСР в dwh_stg_config.load_data_config_sources" --sch "схема для загрузки (statement)" --meta_db "DEV или TEST"
ВАЖНО! Если БД источник - HBase, то для корректной работы необходимо установить Java (JDK), а также положить драйвер для подключения к HBase (phoenix-5.0.0-HBase-2.0-thin-client.jar) в папку spark/packages (предварительно папку надо создать)
скрипт работает только на python 3.10. Если установлена более ранняя версия python, то можно попросить загрузить кого-нибудь из коллег, у кого установлен python 3.10
В результате выполнения скрипта в таблице dwh_stg_config.source_db_structure добавятся/обновятся данные о всех таблицах в схеме statement в БД ТСР.
Инструмент - средство для работы с БД (например, DBeaver
) с подключением к конфигурационной БД.
Используемые функции:
-
dwh_stg_config.get_config_from_source
- просмотр конфигурационных данных -
dwh_stg_config.insert_jdbc_config_from_source
- добавление конфигурации JDBC -
dwh_stg_config.update_jdbc_config_from_source
- обновление существующей конфигурации JDBC -
dwh_stg_config.insert_kafka_config_from_source
- добавление новых агрегатов Kafka -
dwh_stg_config.update_kafka_config_from_source
- обновление агрегатов Kafka -
dwh_stg_config.insert_full_config_from_source
- добавление конфигурации JDBC и агрегата Kafka вместе -
dwh_stg_config.update_full_config_from_source
- обновление конфигурации JDBC и добавление/обновление агрегата Kafka вместе
Для работы в данном случае нужны только insert_full_config_from_source и update_full_config_from_source. Остальные функции используются внутри этих двух.
Примеры соответствующих вызовов функций:
SELECT *
FROM dwh_stg_config.source_db_structure str
cross JOIN json_to_record (
dwh_stg_config.get_config_from_source(
_db_id := str.source_db,
_schema_name := str.schema_name,
_table_name := str.table_name,
_aggregate_name := null,
_result_table_name := null
)
) as (
app_name text,
kafka_config_id int,
load_mode text,
aggregate_name text,
source_from_id int,
schema_from_name text,
table_from_name text,
select_text text,
filter_col text,
default_filter_val text,
source_to_id int,
src_jdbc_table_name text,
src_kafka_table_name text,
json_fields text,
table_columns_json text,
dbt_generate_jdbc_stg text,
dbt_generate_kafka_stg text,
dbt_generate_kafka_test text,
dbt_generate_core text
)
WHERE str.table_name = 'decision'
and str.schema_name = 'statement'
and str.source_db = 8
do
$$
declare
_config_id int;
begin
_config_id := dwh_stg_config.insert_jdbc_config_from_source(
_db_id := 8,
_schema_name := 'statement',
_table_name := 'decision',
_aggregate_name := null,
_result_table_name := 'statement__decision');
raise notice '%', _config_id::text;
end;
$$;
do
$$
declare
_config_id int;
_table_name text;
_aggregate_name text;
begin
select j.config_id,
case
when position( '_jdbc__' in j.table_name_to) = 0
then null
else substring(j.table_name_to from position( '_jdbc__' in j.table_name_to)+7)
end as table_name,
a.aggregate_name
into _config_id, _table_name, _aggregate_name
from dwh_stg_config.load_data_config_jdbc j
left join dwh_stg_config.load_kafka_aggregate_tables a
on j.table_name_to = a.jdbc_table_name
and a.active_record = 1
where j.config_id = 3533; --ид обновляемой записи
_config_id := dwh_stg_config.update_jdbc_config_from_source(
_config_id := _config_id,
_aggregate_name := _aggregate_name,
_result_table_name := _table_name
);
raise notice '%', _config_id::text;
end;
$$;
do
$$
declare
_aggregate_table_id int;
begin
_aggregate_table_id := dwh_stg_config.insert_kafka_config_from_source(
_config_id_jdbc := 1600,
_aggregate_name := 'this.is.aggregate.name');
raise notice '%', _aggregate_table_id::text;
end;
$$;
do
$$
declare
_aggregate_table_id int;
begin
_aggregate_table_id := dwh_stg_config.update_kafka_config_from_source(
_aggregate_table_id := 1600,
_jdbc_config_id := null,
_aggregate_name := null,
_result_table_name := null
);
raise notice '%', _aggregate_table_id::text;
end;
$$;
do
$$
declare
res text;
begin
res := dwh_stg_config.insert_full_config_from_source(
_db_id := 8,
_schema_name := 'statement',
_table_name := 'person',
_aggregate_name := 'ru.gov.pfr.ecp.apso.tsr.statement.pojo.Person',
_result_table_name := 'stm__pers__tt');
raise notice '%', res;
end;
$$;
do
$$
declare
res text;
begin
res := dwh_stg_config.update_full_config_from_source(
_config_id := 3627,
_aggregate_name := null,
_result_table_name := null
);
raise notice '%', res;
end;
$$;
Для каждой таблицы очень желательно сразу указать агрегат (атрибут _aggregate_name в функциях).
- Если таблица НЕ НСИ, то можно воспользоваться запросом в БД Greenplum вида:
SELECT "aggregateName", count(*)
FROM dwh_src.src_kafka__iw_multipart
where lower("aggregateName") like '%tsr%'
group by "aggregateName"
order by 1, 2
Если в entityName или aggregateName есть соответствующий агрегат, то необходимо привязать его к созданному конфигу JDBC. Если есть сомнения по поводу корректности агрегата, можно проверить набор его полей запросами:
select value_json
from dwh_src.src_kafka__iw_multipart
where "aggregateName" = 'ru.gov.pfr.ecp.rs.rs_fl_ip.pojo.GpDogovor'
Набор полей должен соотноситься с набором полей в таблице-источнике.
- Если таблица НСИ, то название агрегата = названию таблицы в CamelCase. Название можно найти, выполнив в БД НСИ запрос вида:
select *
from wso.dict
where lower(name) like '%table%'
Для версионных справочников НСИ, в которых имеются несколько записей по одному code с различными периодами действия startdate / enddate. При добавлении/обновлении подобных справочников при запуске функций dwh_stg_config.update_full_config_from_source или dwh_stg_config.insert_full_config_from_source необходимо передавать параметр _id_field со значением 'code'. В остальных случаях передавать параметр не нужно. Пример соответствующих вызовов функций:
do
$$
declare
res text;
begin
res := dwh_stg_config.insert_full_config_from_source(
_db_id := 3,
_schema_name := 'wso',
_table_name := 'dictofficemse',
_aggregate_name := 'DictOfficeMse',
_result_table_name := 'dictofficemse',
_id_field := 'code');
raise notice '%', res;
end;
$$;
do
$$
declare
res text;
begin
res := dwh_stg_config.update_full_config_from_source(
_config_id := 3580,
_aggregate_name := 'DictOfficeMse',
_result_table_name := 'dictofficemse',
_id_field := 'code'
);
raise notice '%', res;
end;
$$;
После добавления/обновления конфига необходимо проверить корректность данных в dwh_stg_config.load_data_config_jdbc и dwh_stg_config.load_kafka_aggregate_tables.
ВАЖНО! По умолчанию конфиг добавляется с флагом
active = 0
. Для того, чтобы он попал в загрузку, необходимо изменить флаг на 1.
Инструменты:
- средство для работы с БД (например,
DBeaver
) с подключением к конфигурационной БД для просмотра добавленых ранее конфигов - локальная консоль (
python
) для выполнения скриптов - средство просмотра файлов в файловой системе (проводник,
VSCode
)
Если агрегат был добавлен, то в dwh_src необходимо создать также таблицу с данными kafka. Есть два варианта сделать это:
- Для НЕ НСИ: Запустить DAG HOAD_..._KAFKA_load_config_iw_system с параметрами {"offsets": "{'0': -2}", "aggregate_name": "aggr.name"} - указать название добавляемого агрегата (не забыть перед этим запустить DAG HOAD_..._update_config_file).
- Для НСИ, либо если для НЕ НСИ по первому варианту таблица не создалась: использовать запрос sql_scripts/old/114_select_to_create_kafka_table.sql. Запрос необходиимо выполнить в конфигурационной БД. В последнем столбце будет SQL-код для создания таблицы. Его необходимо выполнить в БД Greenplum.
На предыдущем шаге в конфигурационную таблицу в том числе добавляются вызовы скриптов генерации моделей dbt
.
- Модель jdbc stg.
Для создания модели открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_data_config_jdbc.dbt_to_stg. В результате выполнения команды в папке dbt/models/stg/jdbc/generated_views будет создана необходимая модель.
- Модель kafka stg.
Если агрегат неизвестен и не был указан в конфигурации, то этот шаг пропускается.
Для создания модели открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_kafka_aggregate_tables.dbt_to_stg. В результате выполнения команды в папке dbt/models/stg/kafka/generated_views будет создана необходимая модель.
2.1. Тест данных kafka.
Если агрегат неизвестен и не был указан в конфигурации, то этот шаг пропускается.
Для создания теста открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_kafka_aggregate_tables.dbt_test. В результате выполнения команды в папке dbt/tests/stg/kafka/generated_tests будет создан необходимый тест.
Описание генератора тестов находится в dbt/tests/stg/README.md.
- Модель core.
Для создания модели открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_data_config_jdbc.dbt_to_core. В результате выполнения команды в папке dbt/models/core/generated_tables будет создана необходимая модель.
- Проверить созданные модели и тесты и, если всё ок, перенести их в соответствующие папки модуля, к которому они относятся.
4.1. В случае, если в модели необходимо создать индексы на определенные поля, которые могут потребоваться в витринах при соединении таблиц или в фильтрах, необходимо дополнить поле post_hook следующей конструкцией - {{create_index(<schema_name>, <table_name>, <column_name>, <_number>)}}
, где
- <schema_name> - название схемы, в которой распологается модель
- <table_name> - название модели
- <column_name> - название индексируемой колонки(можно создать составной индекс, указав колонки через запятую - 'column1,column2,column3', при этом пробелы использовать нельзя, т.к. эта переменная так же включается в название)
- <_number> - порядковый номер индекса, начинается с 0 (по канону)
Создаются индексы только типа B-дерево
При использовании нескольких макросов в post_hook необходимо разделять их точкой с запятой (;) -
post_hook: "{{macros1()}} ; {{macros2()}}"
ВАЖНО! Созданные модели являются только шаблонами, при необходимости в них можно вносить изменения.
Инструменты - Airflow
- Запустить DAG HOAD_<среда>_update_config_file для загрузки актуальной конфигурации из БД в конфигурационный файл
dag_utils/config_file_data.py
- DAG'и в настоящий момент распределяются по модулям. Например, в DAG HOAD_<среда>_JDBC_load_TSR попадают конфиги, app_name которых начинается на "tsr". Если таблица относится к модулю, для которого уже создан DAG, то необходимо открыть этот DAG и посмотреть, что соответствующие таски с указанной таблицей появились. Если такого DAG нет, его необходимо создать по аналогии с ранее созданными DAG.
- Запустить выбранный DAG, либо дождаться его выполнения, если для него установлено расписание, проверить результат его выполнения.
- Ожидаемый результат DAG в целевой БД:
- Должна создаться/обновиться таблица, указанная в dwh_stg_config.load_data_config_jdbc.table_name_to
- Должны создаться/обновиться объекты, созданные в
dbt
(вью в схеме dwh_stg, таблица в схеме dwh_core). ВАЖНО! На данном этапе п.2 не выполнится, т.к. модели dbt ещё не загружены на стенд Airflow. Для создания объектов в БД необходимо локально запустить dbt командой dbt run --select tag:название_тега_сущности
Инструменты - Airflow
- Запустить DAG HOAD_<среда>_update_config_file для загрузки актуальной конфигурации из БД в конфигурационный файл
dag_utils/config_file_data.py
. - Если при создании конфигурации был указан агрегат в kafka, то изменения по нему загружаются в рамках DAG HOAD_<среда>_KAFKA_load_config_iw_system.
- Запустить указанный DAG, либо дождаться его выполнения, если для него установлено расписание, проверить результат его выполнения.
- Ожидаемый результат DAG в целевой БД:
- Должна создаться/обновиться таблица, указанная в dwh_stg_config.load_kafka_aggregate_tables.kafka_table_name.
- Должны создаться/обновиться объекты, созданные в
dbt
(вью в схеме dwh_stg, таблица в схеме dwh_core) ВАЖНО! На данном этапе п.2 не выполнится, т.к. модели dbt ещё не загружены на стенд Airflow. Для создания объектов в БД необходимо локально запустить dbt командой dbt run --select tag:название_тега_сущности
Инструменты:
- средство для работы с БД (например,
DBeaver
) с подключением к целевой БД - локальный редактор файлов (
Notepad++
,VSCode
)
Необходимый README создан в папке dbt/models/marts.
- Написать необходимые запросы для формирования витрины, проверить.
- Создать модель в
dbt
в папке dbt/models/marts по аналогии с ранее созданными витринами. - Указать описание витрины в файле dbt/models/marts/dwh_marts.yml