CDC из DynamoDB в ClickHouse
На этой странице описано, как настроить CDC из DynamoDB в ClickHouse с использованием ClickPipes. В эту интеграцию входят два компонента:
- Начальный снимок через S3 ClickPipes
- Обновления в режиме реального времени через Kinesis ClickPipes
Данные будут поступать в таблицу на движке ReplacingMergeTree. Этот движок таблицы обычно используется для сценариев CDC, чтобы обеспечить применение операций обновления. Подробнее об этом подходе можно прочитать в следующих статьях блога:
- Change Data Capture (CDC) с PostgreSQL и ClickHouse — Часть 1
- Change Data Capture (CDC) с PostgreSQL и ClickHouse — Часть 2
1. Настройка потока Kinesis
Сначала включите поток Kinesis для таблицы DynamoDB, чтобы фиксировать изменения в режиме реального времени. Это нужно сделать до создания снимка, чтобы не пропустить какие-либо данные. Руководство AWS доступно по ссылке здесь.

2. Создание снимка
Теперь создадим снимок таблицы DynamoDB. Это можно сделать с помощью экспорта AWS в S3. Руководство AWS доступно здесь. Вам нужен «Full export» в формате DynamoDB JSON.

3. Загрузка снимка в ClickHouse
Создайте необходимые таблицы
Данные снимка из DynamoDB будут выглядеть примерно так:
Обратите внимание, что данные имеют вложенную структуру. Нам нужно будет привести их к плоскому виду перед загрузкой в ClickHouse. Это можно сделать с помощью функции JSONExtract в ClickHouse в материализованном представлении.
Нам нужно создать три таблицы:
- Таблица для хранения «сырых» данных из DynamoDB
- Таблица для хранения окончательно развёрнутых данных (таблица назначения)
- Материализованное представление для преобразования данных в плоский формат
Для приведённых выше примерных данных из DynamoDB таблицы в ClickHouse будут выглядеть следующим образом:
Есть несколько требований к целевой таблице:
- Эта таблица должна быть таблицей
ReplacingMergeTree - В таблице должен быть столбец
version- На последующих шагах мы сопоставим поле
ApproximateCreationDateTimeиз потока Kinesis со столбцомversion.
- На последующих шагах мы сопоставим поле
- Таблица должна использовать ключ партиционирования в качестве ключа сортировки (задаваемого в
ORDER BY)- Строки с одинаковым ключом сортировки будут очищаться от дубликатов на основе столбца
version.
- Строки с одинаковым ключом сортировки будут очищаться от дубликатов на основе столбца
Создание snapshot ClickPipe
Теперь вы можете создать ClickPipe для загрузки snapshot-данных из S3 в ClickHouse. Следуйте руководству по S3 ClickPipe здесь, но используйте следующие настройки:
- Ingest path: вам нужно определить путь к экспортированным JSON-файлам в S3. Путь будет выглядеть примерно так:
- Формат: JSONEachRow
- Таблица: ваша таблица снимка (например,
default.snapshotв приведённом выше примере)
После её создания данные начнут поступать в таблицу снимка и целевую таблицу. Вам не нужно дожидаться окончания загрузки снимка, чтобы перейти к следующему шагу.
4. Создайте Kinesis ClickPipe
Теперь мы можем настроить Kinesis ClickPipe для захвата изменений в реальном времени из потока Kinesis. Следуйте руководству по Kinesis ClickPipe здесь, но используйте следующие настройки:
- Stream: Поток Kinesis, использованный на шаге 1
- Table: Ваша целевая таблица (например,
default.destinationв примере выше) - Flatten object: true
- Column mappings:
ApproximateCreationDateTime:version- Отобразите остальные поля в соответствующие целевые столбцы, как показано ниже

5. Очистка (необязательно)
После завершения снапшотного ClickPipe вы можете удалить таблицу снимка и материализованное представление.