惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

小众软件
小众软件
N
News and Events Feed by Topic
A
About on SuperTechFans
aimingoo的专栏
aimingoo的专栏
The Cloudflare Blog
H
Heimdal Security Blog
Schneier on Security
Schneier on Security
Engineering at Meta
Engineering at Meta
Google Online Security Blog
Google Online Security Blog
宝玉的分享
宝玉的分享
AI
AI
The GitHub Blog
The GitHub Blog
MongoDB | Blog
MongoDB | Blog
www.infosecurity-magazine.com
www.infosecurity-magazine.com
The Last Watchdog
The Last Watchdog
T
Troy Hunt's Blog
S
Security @ Cisco Blogs
H
Hacker News: Front Page
F
Fortinet All Blogs
博客园_首页
S
Secure Thoughts
N
News and Events Feed by Topic
P
Proofpoint News Feed
Microsoft Azure Blog
Microsoft Azure Blog
I
InfoQ
Spread Privacy
Spread Privacy
Hacker News - Newest:
Hacker News - Newest: "LLM"
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Check Point Blog
Hugging Face - Blog
Hugging Face - Blog
Hacker News: Ask HN
Hacker News: Ask HN
C
CXSECURITY Database RSS Feed - CXSecurity.com
酷 壳 – CoolShell
酷 壳 – CoolShell
Stack Overflow Blog
Stack Overflow Blog
L
LINUX DO - 最新话题
Exploit-DB.com RSS Feed
Exploit-DB.com RSS Feed
S
Schneier on Security
Know Your Adversary
Know Your Adversary
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
Scott Helme
Scott Helme
P
Privacy & Cybersecurity Law Blog
S
Securelist
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
O
OpenAI News
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
PCI Perspectives
PCI Perspectives
L
LangChain Blog
雷峰网
雷峰网
Security Archives - TechRepublic
Security Archives - TechRepublic
V2EX - 技术
V2EX - 技术

Все публикации подряд на Хабре

Ловим музу за клавиатуру: как айтишнику стать автором Что умеет Midjourney в 2026? Мой немного грустный разбор этого шикарного инструмента Никто не любит писать тесты, но ИИ может исправить это IPv8 выглядит как мечта. Поэтому почти наверняка не взлетит Производители вернули в продажу материнки с DDR3. Что происходит? Управление агентом с телефона через Telegram теперь в KodaCode От координации к лидерству: как меняется роль руководителя разработки Я сделала родителям бизнес вместо пенсии: зарабатываем 70 тысяч, мама не даёт продать В три раза быстрее приемка товара и оптимизация трудозатрат на 73%: как «РСТ-Инвент» помог Gulliver Group ИИ-шечный мир победил? О влиянии искусственного интеллекта на игропром Кремль снижает давление на Телеграмм пока Европа строит интернет по паспорту Как CEO, CTO и CIO за 8 часов собрали ИИ-директора, который умеет держать позицию под давлением Как (не) потерять домен за выходные Вместо 8 разных VPS: как я организовал практику студентам на одном сервере Почему твой Open Source проект не замечают? R&D: искусство управления неопределенностью в разработке AI-дефляция: вакансий для разработчиков больше, а рост зарплат — худший за 15 лет Мы отдали управление роботами OpenClaw. Что из этого вышло Галактический ID: система идентификации для всех форм разумной жизни Шесть основ бизнес-анализа: начинаем с вопроса «Кто в игре?» Код-ревью, в котором дело не в коде Данные переехали. Команда — нет Системной подход к сдаче OSWE в 2025 Почему комната управления реактором покрашена в цвет морской пены 4 YAML-файла вместо PySpark: как аналитикам строить пайплайны без разработчиков LLM-агент для поиска свободных доменов: автоматизируем подбор Когда, зачем и как правильно начинать новую сессию в Claude Code? Как я заставил нейросеть писать макросы для FreeCAD Анатомия ИИ‑агента для подбора персонала. От тысячи резюме к топ‑10 за минуты Опыт разработчика как экономика внимания Автономность как точка невозврата: кто будет субъектом в цифровом будущем Обучение ИИ в «диких» условиях: как рутинные действия превращаются в датасеты Как измерить LLM для задач кибербеза: обзор открытых бенчмарков Где хранить код? Сравнение GitHub, GitLab и Bitbucket Математика объясняет, почему нормальное распределение встречается повсюду Почему ваш FinOps не работает: 12 тезисов от практиков Как подписать проектную документацию УКЭП с использованием бесплатных лицензий Pilot Адаптивное администрирование Sigla Vision Я грузил уран в бочки, а потом 20 лет строил ИТ в атомной отрасли Чем позвонить с Эвереста? История и обзор спутниковой связи. Часть 2 Как языковая модель помогает контролировать качество инструктажей по охране труда в металлургии Как не передать на desktop свой IP в РКН Анатомия SAP Privileges: как устроено управление правами в macOS MoneyDev: Сказка про три главных слова Обновлённый токенизатор видео K-VAE 2.0 от Сбера Как сделать диспетчеризацию дома на 1284 квартиры почти бесплатно Как мы разогнали железную дорогу Мы дали агентам рутину. Теперь надо решить — что делать с освободившимся временем Токсичный контент, промпт-хакинг и защита ИИ — всё о Guardrails для LLM Умный город начинается с точного взгляда: как «Фалькон Тех» меняет пространство к лучшему Навайбкодил приложение для анализа графов Почему Дюну так интересно читать? Упрощаем работу с рутиной или как стать Гендальфом Белым Деконструкция Go: CPU, RAM и что там происходит. Go Assembler база. Часть 1.1 Какие профессии исчезнут из-за ИИ, а какие появятся? И что с этим делать Как мы построили IT-отдел, где хочется расти: архитектурные встречи, прозрачные метрики и книжные подарки Rufler: Делаем из Claude Code автономный рой через один YAML-конфиг Sing-box и белый список приложений Как построить надёжный обмен сообщениями в микросервисах: лучшие практики для enterprise OpenAI строит MLM-пирамиду, а McKinsey и Accenture помогают ей в этом Дом, который не построил Фишер (Часть 2) «Сверхзвуковой математик» против «Вдумчивого логиста»: битва алгоритмов 3D-упаковки Мультимодальные модели – грубый и дорогой инструмент Разговоры ничего не стоят. Код тоже Проверки физических лиц: с кого начнет ФНС Топ-10 бесплатных нейросетей для создания видео в 2026 году Первые слои кода: как наши решения сегодня определяют архитектуру ИИ на десятилетия Разработка нового статического анализатора: PVS-Studio JavaScript Поиск уязвимостей ПО: базовый минимум или роскошный максимум Почему оценка персонала не работает как инструмент управления Как мы разработали ИИ-ассистента и сократили рутину продуктовой команды на 50% Как я ушел из найма, нажарил косточек и продал на маркетплейсах на 168 млн в год Когда 1С:ERP уже внедрена, а нормального производственного плана всё ещё нет Как я сделал Claude мультимодальным, подключив к нему Qwen Omni Как приглашение на вакансию мечты превращается в атаку Infrastructure as Code: философия и лучшие практики IaC Тестируем Yandex Code Assistant на задаче, в которой нужно хранить секреты nxs-universal-chart v3.0: новое поколение универсального Helm-чарта Callback Injection: Техника, которая отправила Microsoft Defender в глухой нокаут «Все идеи на стол»: митап как способ вывести проект из тупика Сегодня я узнал нечто новое о GPU благодаря багу в своей игре Как заставить LLM ̶ ̶г̶а̶л̶л̶ю̶ ̶ эволюционировать Карта событий как фундамент аналитики: практический кейс для E-commerce Что выбрать для AI: x86, ARM или RISC-V? Дайджест железа за март Роль соматических мутаций в развитии аутоиммунных заболеваний: путь к избирательной терапии Mythos от Anthropic — тревожный сигнал для всех, а не только для банков Guardrails для LLM на Java: как приручить промпт‑инъекции и токсичные ответы Green-VLA: как мы собрали VLA-модель для реального антропоморфного робота и не потеряли обобщение Финансовая гонка вооружений: почему умные люди добровольно в ней участвуют Эра ИИ-агентов наступила: выбираем лучшего цифрового сотрудника # Практический опыт внедрения WinCC Redundancy на производственном предприятии Сделал MVP за 3 дня, а потом неделю прикручивал оплату. Оно того стоило? Физика против Маска: почему Starship V3 может оказаться ещё одной катастрофой Нефть Венесуэлы: крупнейшие запасы в мире, но не крупнейшая нефтяная держава JPA 4. Переосмысление Hibernate Почему зеркальная фотокамера Nikon D5 десятилетней давности идеально подошла для миссии «Артемида-2» Проект «Уровень-Спутник» или как мы сделали платформу для гидрологов «Замедлиться, чтобы ускориться»: почему ИИ повышает цену ошибок в требованиях и архитектуре Как с нуля поднять трафик IT-компании на 1657% при бюджете 55 тыс. и выжить Pixel-perfect Downsampling — идеальная отрисовка 50 миллионов точек без потерь
Строим машину времени для данных (SCD-2) на движке Trino под управлением Airflow
VasiliyS178 · 2026-05-19 · via Все публикации подряд на Хабре

Уровень сложностиСредний

Время на прочтение11 мин

Охват и читатели14

Туториал

Привет, на связи Василий Самарин, ведущий инженер данных в Х5 Tech. Это моя вторая статья по теме построения SCD-2-таблиц. Если вы еще не знакомы с SCD-2-таблицами, то рекомендую заглянуть в мою первую статью, где рассказывается про то, когда и зачем можно и нужно их использовать, и подробно разбирается пример для PySpark и Hive.

Сегодня SCD-2-таблицы не только остаются актуальными для медленно меняющихся данных, но и, на мой взгляд, становятся гораздо проще в реализации благодаря новым технологиям и инструментам.

С того момента, когда я писал свою предыдущую статью про построение SCD-2-таблиц на PySpark 2.4.4 и Hive, прошло около года, и мне снова поручили пересобрать эту же витрину, но уже в ходе миграции в наше новое хранилище данных. Да, в X5 постоянно что-то меняется, внедряются новые технологии и инструменты. Это одна из причин, почему мне нравится здесь работать.

Итак, в этой статье мы будем:

  • строить Iceberg-таблицы с типом SCD-2 с помощью Trino с использованием SQL и Python;

  • попутно освоим прекрасные функции merge, MD5 и другие полезные инструменты;

  • напишем свой собственный оператор для Airflow для автоматизации ETL-процесса.

Полный код SQL-запросов оператора для Airflow

Полный код SQL-запросов для создания и наполнения всех необходимых исходных таблиц, итеративного наполнения SCD-2-таблицы данными, а также код оператора для Airflow вы найдёте в моём репозитории.

Для понимания общего контекста сначала немного расскажу про Lakehouse DMP 2.0.

Наш новый Lakehouse

Текущее хранилище данных на базе Greenplum исчерпало возможности масштабирования. У централизованной модели управления были узкие места — команды не справлялись с бэклогом, а сложный процесс внесения изменений замедлял поставку дата-продуктов. Кроме того, существующая архитектура не позволяла обеспечивать необходимую оперативность данных (T-1) из-за многоэтапной «перекладки» информации между системами хранения.

DMP 2.0 решает эти проблемы переходом на архитектуру Lakehouse с декомпозицией на независимые дата-хабы (Data Mesh). Технологически разделяется слой хранения данных (object storage на базе открытых стандартов) и вычислительный слой (Trino, Spark, Flink), что обеспечивает гибкое горизонтальное масштабирование без крупных инфраструктурных вложений. Организационно вместо единого хранилища данных внедряется федеративная модель: автономные команды-владельцы дата-хабов самостоятельно разрабатывают и поддерживают дата-продукты, публикуя данные через формализованные дата-контракты. Это снимает нагрузку с центральной команды и ускоряет time-to-market для аналитических решений.

Ключевые принципы нового распределённого хранилища:

  • наблюдаемость — данные и потоки описаны и находятся под мониторингом;

  • компонуемость данных — можно объединять данные из различных хабов в одном запросе;

  • строгое разграничение ответственности — внедрение дата-контрактов;

  • отсутствие дублирования данных за счёт универсальности Trino.

Технологический стек построен на компонентах с открытым исходным кодом (Trino, Iceberg, MinIO, Airflow).

Если вам будет интересно узнать подробности про наш новый Lakehouse, то напишите, пожалуйста, об этом в комментариях. Мы сделаем про него отдельную статью.

Итак, в описанной среде нужно было построить SCD-2-таблицу с использованием SQL-движка Trino. Оказалось, что у нас пока нет универсального загрузчика для сборки таких таблиц, поэтому я реализовал свой алгоритм.

Общая логика сборки

Основная задача была в том, чтобы на вход получать очередной срез данных (состояние на расчётную дату), который рассчитывается по расписанию, и затем добавлять его в SCD-2-таблицу. А для обработки накопленных ранее срезов мы хотели предусмотреть возможность их последовательной загрузки на этапе первичного наполнения SCD-2-таблицы.

Чтобы сборка была прозрачной и поддерживаемой, весь ETL-процесс разделили на 7 простых последовательных шагов.

Шаг 1. Подготовка среза для обработки.

Шаг 2. Подготовка среза предыдущего состояния.

Шаг 3. Выявление новых записей.

Шаг 4. Выявление удалённых записей.

Шаг 5. Извлечение изменившихся записей для их обновления в целевой таблице.

Шаг 6. Извлечение изменившихся записей для их закрытия в целевой таблице.

Шаг 7. Сборка всех записей для вставки в целевую таблицу.

Шаг 8. Запись в целевую SCD-2-таблицу.

Каждый шаг — это отдельный SQL-запрос для Trino. Результаты работы на шагах 1-7 лучше сохранять во временные таблицы, а на шаге 8 сделать MERGE данных в целевую таблицу. Как показала практика, сохранение промежуточных результатов очень удобно для диагностики ошибок.

Для автоматизации запуска ETL-процесса по расписанию мы будем использовать Airflow 2.11.0.

Пройдёмся по шагам и кратко разберём наиболее важные моменты.

Шаги 1-2

На шагах 1 и 2 нам нужно добавить технические колонки в обрабатываемый срез данных. Для нас это будет результат регулярного расчета (last_slice в моем примере) или срез из исторической таблицы (hist_slice).

То же самое нужно сделать со снимком, отражающим предыдущее состояние. Его мы будем брать из нашей целевой SCD-2-таблицы.

Вот что нам нужно добавить:

  • valid_from_dttm — дата начала действия записи;

  • valid_to_dttm — дата окончания действия записи;

  • hashdiff_key — хеш-сумма для быстрого поиска изменившихся записей. Будем рассчитывать её на лету.

Чтобы предусмотреть возможность использования одного и того же скрипта для первичной и регулярной загрузок, добавим в шаблон запроса для шага 1 следующие параметры:

  • reload_flg — признак того, что мы запускаем первичную сборку или делаем загрузку истории;

  • custom_dt — дата, за которую мы хотим взять срез из таблицы с рассчитанными ранее срезами. В случае регулярной загрузки — это будет текущая дата.

Эти параметры потом будут передаваться в оператор Airflow.

Вот как будут выглядеть наши скрипты для шагов 1 и 2.

Шаг 1. Подготовка среза для загрузки в целевую таблицу
DROP TABLE IF EXISTS catalog_name.schema_name.current_slice;
CREATE TABLE catalog_name.schema_name.current_slice AS 
WITH last_slice AS (
SELECT	
	customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,    
    LOWER(
	    TO_HEX(
            MD5(
                TO_UTF8(
                    CONCAT_WS(
                        '|',                        
                        CAST(customer_id as VARCHAR),
                        CAST(mobile_phone_flg as VARCHAR),
                        CAST(email_flg as VARCHAR),
                        CAST(sms_consent_flg as VARCHAR),
                        CAST(email_consent_flg as VARCHAR),
                        CAST(push_consent_flg as VARCHAR)                       
                    )
                )
            )
        )
    ) as hashdiff_key,
    CAST(CAST(dataflow_dttm AS DATE) AS TIMESTAMP) AS valid_from_dttm,
    CAST('5999-01-01 00:00:00' AS TIMESTAMP) AS valid_to_dttm
FROM
	catalog_name.schema_name.customer_consents
WHERE
	CAST({{ params.reload_flg }} AS VARCHAR) = '0'
)
, hist_slice AS (
SELECT
	customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,    
    LOWER(
	    TO_HEX(
            MD5(
                TO_UTF8(
                    CONCAT_WS(
                        '|',                        
                        CAST(customer_id as VARCHAR),
                        CAST(mobile_phone_flg as VARCHAR),
                        CAST(email_flg as VARCHAR),
                        CAST(sms_consent_flg as VARCHAR),
                        CAST(email_consent_flg as VARCHAR),
                        CAST(push_consent_flg as VARCHAR)                       
                    )
                )
            )
        )
    ) as hashdiff_key,
    CAST(dataflow_dt AS TIMESTAMP) AS valid_from_dttm,
    CAST('5999-01-01 00:00:00' AS TIMESTAMP) AS valid_to_dttm
FROM
	catalog_name.schema_name.customer_consents_sliced
WHERE
	CAST({{ params.reload_flg }} AS VARCHAR) = '1'
	AND dataflow_dt = CAST('{{ params.custom_dt }}' AS DATE)
) 
SELECT
	customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,    	
    hashdiff_key,
    valid_from_dttm,
    valid_to_dttm
FROM
    last_slice
UNION ALL
SELECT
    customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,    	
    hashdiff_key,
    valid_from_dttm,
    valid_to_dttm
FROM
    hist_slice
;
Шаг 2. Подготовка среза предыдущего состояния
DROP TABLE IF EXISTS catalog_name.schema_name.previous_slice;
CREATE TABLE catalog_name.schema_name.previous_slice AS 
SELECT
	customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,    
    LOWER(
	    TO_HEX(
            MD5(
                TO_UTF8(
                    CONCAT_WS(
                        '|',                        
                        CAST(customer_id AS VARCHAR),
                        CAST(mobile_phone_flg AS VARCHAR),
                        CAST(email_flg AS VARCHAR),
                        CAST(sms_consent_flg AS VARCHAR),
                        CAST(email_consent_flg AS VARCHAR),
                        CAST(push_consent_flg AS VARCHAR)                       
                    )
                )
            )
        )
    ) AS hashdiff_key,
    valid_from_dttm,
    valid_to_dttm
FROM
	catalog_name.schema_name.customer_consents_versioned
WHERE
	valid_to_dttm = CAST('5999-01-01 00:00:00' AS TIMESTAMP)
;

На выходе мы получим два подготовленных среза данных:

  • срез для обработки (current_slice);

  • срез предыдущего состояния (previous_slice).

Шаги 3-4

На шагах 3 и 4 мы делаем JOIN двух срезов из шага 1 по первичному ключу (customer_id в нашем примере) и сохраняем результаты в промежуточные таблицы. Первичный ключ может быть композитным. Если хотя бы в одной из колонок ключа есть пустые значения, то нужно иметь ввиду, что такие записи будут отброшены при соединении. Если вы не хотите этого допустить, то нужно обработать NULL-значения в каждой из колонок составного ключа, например, с помощью COALESCE. Еще вариант — объединить все колонки составного ключа и посчитать хеш, как мы это делали на шаге 1 для расчета поля hashdiff_key, а потом уже использовать этот ключ для JOIN.

Шаг 3. Новые записи относительно снимка предыдущего состояния
DROP TABLE IF EXISTS catalog_name.schema_name.appended_rows;
CREATE TABLE catalog_name.schema_name.appended_rows AS
SELECT
	cs.customer_id,    
    cs.mobile_phone_flg,
	cs.email_flg,        
    cs.sms_consent_flg,
    cs.email_consent_flg,
    cs.push_consent_flg,
    cs.valid_from_dttm,
    cs.valid_to_dttm
FROM
	catalog_name.schema_name.current_slice cs
LEFT JOIN
	catalog_name.schema_name.previous_slice ps ON cs.customer_id = ps.customer_id
WHERE
	ps.customer_id IS NULL
;
Шаг 4. Удаленные записи
DROP TABLE IF EXISTS catalog_name.schema_name.deleted_rows;
CREATE TABLE catalog_name.schema_name.deleted_rows AS
SELECT
	ps.customer_id,    
    ps.mobile_phone_flg,
	ps.email_flg,        
    ps.sms_consent_flg,
    ps.email_consent_flg,
    ps.push_consent_flg,
    ps.valid_from_dttm,
    cj.valid_to_dttm
FROM
	catalog_name.schema_name.previous_slice ps
LEFT JOIN
	catalog_name.schema_name.current_slice cs ON ps.customer_id = cs.customer_id
CROSS JOIN
	(
		SELECT valid_from_dttm AS valid_to_dttm
		FROM catalog_name.schema_name.current_slice
		LIMIT 1
	) cj
WHERE
	cs.customer_id IS NULL
;

Шаг 5

На шаге 5 мы также делаем JOIN по первичному ключу, но уже сравниваем значения в колонках hashdiff_key. Если они различаются, то берём обновлённое состояние из среза для последующего обновления данных в итоговой таблице.

Шаг 5. Изменившиеся записи для добавления
DROP TABLE IF EXISTS catalog_name.schema_name.updated_rows;
CREATE TABLE catalog_name.schema_name.updated_rows AS
SELECT
	cs.customer_id,    
    cs.mobile_phone_flg,
	cs.email_flg,        
    cs.sms_consent_flg,
    cs.email_consent_flg,
    cs.push_consent_flg,
    cs.valid_from_dttm, 
    cs.valid_to_dttm
FROM
	catalog_name.schema_name.current_slice cs
LEFT JOIN
	catalog_name.schema_name.previous_slice ps ON cs.customer_id = ps.customer_id
WHERE
	ps.hashdiff_key != cs.hashdiff_key
;

Шаг 6

На шаге 6 мы просто собираем все записи в общую таблицу перед тем, как делать MERGE в целевую таблицу.

Шаг 6. Изменившиеся записи для закрытия (обновление valid_to_dttm) в целевой таблице
DROP TABLE IF EXISTS catalog_name.schema_name.closed_rows;
CREATE TABLE catalog_name.schema_name.closed_rows AS
SELECT
	ps.customer_id,    
    ps.mobile_phone_flg,
	ps.email_flg,        
    ps.sms_consent_flg,
    ps.email_consent_flg,
    ps.push_consent_flg,
    ps.valid_from_dttm,
    cs.valid_from_dttm AS valid_to_dttm
FROM
	catalog_name.schema_name.previous_slice ps
LEFT JOIN
	catalog_name.schema_name.current_slice cs ON ps.customer_id = cs.customer_id
WHERE
	ps.hashdiff_key != cs.hashdiff_key
;

Шаг 7

На данном шаге мы просто объединяем полученные записи на предыдущих этапах в общую временную таблицу.

Шаг 7. Сборка всех записей для вставки в целевую таблицу
DROP TABLE IF EXISTS catalog_name.schema_name.load_batch;
CREATE TABLE catalog_name.schema_name.load_batch AS
SELECT
	customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,
    valid_from_dttm,
    valid_to_dttm
FROM 
	catalog_name.schema_name.appended_rows
UNION ALL
SELECT
	customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,
    valid_from_dttm,
    valid_to_dttm
FROM 
	catalog_name.schema_name.deleted_rows
UNION ALL
SELECT
	customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,
    valid_from_dttm,
    valid_to_dttm
FROM 
	catalog_name.schema_name.updated_rows
UNION ALL
SELECT
	customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,
    valid_from_dttm,
    valid_to_dttm
FROM 
	catalog_name.schema_name.closed_rows
;

Шаг 8

На шаге 8 происходит то, чего нам сильно не хватало при работе с таблицами в Hive — MERGE данных. Благодаря переходу на Iceberg-таблицы автоматически обновятся все необходимые записи в целевой таблице.

Шаг 8. Merge в целевую таблицу
MERGE INTO catalog_name.schema_name.customer_consents_versioned AS trg
USING catalog_name.schema_name.load_batch src
	ON src.customer_id = trg.customer_id
	AND src.valid_from_dttm = trg.valid_from_dttm
WHEN MATCHED AND (    
    src.mobile_phone_flg IS DISTINCT FROM trg.mobile_phone_flg
    OR src.email_flg IS DISTINCT FROM trg.email_flg
    OR src.sms_consent_flg IS DISTINCT FROM trg.sms_consent_flg
    OR src.email_consent_flg IS DISTINCT FROM trg.email_consent_flg
    OR src.push_consent_flg IS DISTINCT FROM trg.push_consent_flg
    OR src.valid_to_dttm IS DISTINCT FROM trg.valid_to_dttm
)
THEN UPDATE SET    
	mobile_phone_flg = src.mobile_phone_flg,
	email_flg = src.email_flg,        
	sms_consent_flg = src.sms_consent_flg,
	email_consent_flg = src.email_consent_flg,
	push_consent_flg = src.push_consent_flg,
	valid_to_dttm = src.valid_to_dttm,
	dataflow_dttm = DATE_TRUNC('SECOND', CURRENT_TIMESTAMP)
WHEN NOT MATCHED THEN INSERT (
	customer_id,    
    mobile_phone_flg,
    email_flg,        
    sms_consent_flg,
    email_consent_flg,
    push_consent_flg,
    valid_from_dttm,
    valid_to_dttm,
    dataflow_dttm
) 
VALUES (	
	src.customer_id,    
    src.mobile_phone_flg,
    src.email_flg,        
    src.sms_consent_flg,
    src.email_consent_flg,
    src.push_consent_flg,
    src.valid_from_dttm,
    src.valid_to_dttm,
	DATE_TRUNC('SECOND', CURRENT_TIMESTAMP)
);

Разберём, как этот скрипт устроен.

В запросе мы используем конструкцию IS DISTINCT FROM вместо оператора '<>'. Это расширение SQL корректно обрабатывает NULL. Оно доступно в Trino и некоторых других СУБД, например, PostgreSQL. В блоке WHEN MATCHED с помощью этой конструкции мы проверяем, какие значения в колонках изменились, чтобы потом обновить только их.

Автоматизация через Airflow

Разобравшись с SQL-запросами, можно попробовать собрать SCD-2-таблицу из примера. Всё необходимое для этого вы найдёте в репозитории. Чтобы лучше понять, как работают SCD-2-таблицы вы можете запускать запросы итеративно. Сначала итерация 1 из insert_data.sql, потом все шаги из scd2_build.sql, наблюдая за тем, как накапливается история изменений в целевой таблице.

После этого у вас уже не должно остаться сомнений и вопросов, а значит, пора приступать к автоматизации ETL-процесса, например, с помощью Airflow.

В зависимости от того, какую версию Airflow вы используете, можно сделать это с помощью готового TrinoOperator или написать свой. Мы разработаем свой оператор для Airflow 2.11.0, в которой TrinoOperator пока отсутствует. Для этого достаточно разместить подготовленный модуль scd2_loader.py на Python в директорию plugins и настроить коннект к Trino.

Полный код оператора также есть в репозитории, а мы разберёмся с важными моментами и параметрами запуска:

  • trg_table — целевая таблица;

  • src_table — таблица-источник;

  • key_columns — колонки составного первичного ключа;

  • eff_from — дата начала действия записей из таблицы источника. Обычно подходит дата загрузки данных в таблицу;

  • wrk_schema — схема для временного размещения промежуточных таблиц с результатами;

  • reload_flg — признак пересчёта (когда делаем обработку накопленных срезов, то выставляем его в 1);

  • partition_column — колонка, по которой делаются партиции в таблице с накопленными срезами (нужна только при первичном наполнении при reload_flg = 1);

  • custom_dt — дата, за которую берётся срез из таблицы с накопленными срезами (заполняется только при reload_flg = 1);

  • conn_id — название подключения к Trino в Airflow.

Вот какие требования к источнику данных нужно учитывать при работе с данным оператором:

  • состав колонок и их типы в таблице-источнике и целевой SCD-2-таблице должны совпадать, за исключением технических полей (valid_from_dttm, valid_to_dttm, dataflow_dttm);

  • в таблице-источнике не должно быть дублей по первичному ключу;

  • в таблице-источнике не должно быть записей с NULL-значениями в колонках составного первичного ключа (почему — смотрите пояснения к шагу 3 и 4).

В нашем операторе все рассмотренные выше SQL-запросы, были шаблонизированы. Код для выполнения в Trino генерируется на лету в зависимости от переданных параметров.

В оператор были сразу добавлены базовые проверки переданных параметров и таблиц, чтобы ещё до старта основной части оповестить пользователя об ошибках и не тратить ресурсы на бесполезные вычисления.

Остается только подготовить DAG, а дальше Airflow выполнит все шаги ETL-процесса сборки целевой таблицы автоматически.

Заключение

В итоге мы получили «машину времени», которая позволяет вернуться в любой момент жизни наших данных, а также посмотреть на то, как изменялись записи с момента их рождения. Для этого достаточно сделать фильтр по полям valid_from_dttm, valid_to_dttm, указав нужную нам дату (target_dttm) в прошлом:

SELECT *
FROM target_scd_2_table
WHERE valid_from_dttm <= target_dttm AND valid_to_dttm > target_dttm

Обратите внимание, что valid_to_dttm строго больше target_dttm для избежания получения дублей.

А для получения актуального среза на текущий момент времени достаточно всего одного фильтра:

SELECT *
FROM target_scd_2_table
WHERE valid_to_dttm = CAST('5999-01-01 00:00:00' AS TIMESTAMP)

Надеюсь, что теперь вы окончательно разобрались, как строить SCD-2-таблицы с использованием Spark или Trino SQL, чётко понимаете, когда их нужно использовать и сможете успешно их применять для удобного и экономичного хранения данных. А если вдруг остались вопросы или предложения, то обязательно пишите о них в комментариях. Буду рад научиться чему-то новому.