Интеграция Apache Beam и ClickHouse
Apache Beam — это открытая унифицированная модель программирования, которая позволяет разработчикам определять и выполнять как пакетные, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam заключается в его способности поддерживать широкий спектр сценариев обработки данных — от ETL (Extract, Transform, Load — извлечение, преобразование и загрузка) операций до сложной обработки событий и аналитики в режиме реального времени.
Эта интеграция использует официальный JDBC-коннектор ClickHouse в качестве базового слоя записи данных.
Пакет интеграции
Пакет, необходимый для интеграции Apache Beam и ClickHouse, поддерживается и развивается в разделе Apache Beam I/O Connectors — пакете интеграций с популярными системами хранения данных и базами данных.
Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO находится в репозитории Apache Beam.
Настройка пакета ClickHouse для Apache Beam
Установка пакета
Добавьте следующую зависимость в используемую систему управления пакетами:
Коннектор ClickHouseIO рекомендуется использовать, начиная с Apache Beam версии 2.59.0.
Более ранние версии могут не в полной мере поддерживать функциональность коннектора.
Артефакты можно найти в официальном репозитории Maven.
Пример кода
Следующий пример считывает CSV‑файл с именем input.csv как коллекцию PCollection, преобразует его в объект Row (используя определённую схему) и вставляет в локальный экземпляр ClickHouse с помощью ClickHouseIO:
Поддерживаемые типы данных
| ClickHouse | Apache Beam | Поддерживается | Примечания |
|---|---|---|---|
TableSchema.TypeName.FLOAT32 | Schema.TypeName#FLOAT | ✅ | |
TableSchema.TypeName.FLOAT64 | Schema.TypeName#DOUBLE | ✅ | |
TableSchema.TypeName.INT8 | Schema.TypeName#BYTE | ✅ | |
TableSchema.TypeName.INT16 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.INT32 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.INT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.STRING | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.UINT8 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.UINT16 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.UINT32 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.UINT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.DATE | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.DATETIME | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.ARRAY | Schema.TypeName#ARRAY | ✅ | |
TableSchema.TypeName.ENUM8 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.ENUM16 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.BOOL | Schema.TypeName#BOOLEAN | ✅ | |
TableSchema.TypeName.TUPLE | Schema.TypeName#ROW | ✅ | |
TableSchema.TypeName.FIXEDSTRING | FixedBytes | ✅ | FixedBytes — это LogicalType, представляющий байтовый массив фиксированной длины, определённый в пакете org.apache.beam.sdk.schemas.logicaltypes |
Schema.TypeName#DECIMAL | ❌ | ||
Schema.TypeName#MAP | ❌ |
Параметры ClickHouseIO.Write
Вы можете настроить конфигурацию ClickHouseIO.Write с помощью следующих функций-сеттеров:
| Функция-сеттер параметра | Тип аргумента | Значение по умолчанию | Описание |
|---|---|---|---|
withMaxInsertBlockSize | (long maxInsertBlockSize) | 1000000 | Максимальный размер блока строк для вставки. |
withMaxRetries | (int maxRetries) | 5 | Максимальное число повторных попыток для неуспешных вставок. |
withMaxCumulativeBackoff | (Duration maxBackoff) | Duration.standardDays(1000) | Максимальная суммарная продолжительность интервала ожидания (backoff) для повторных попыток. |
withInitialBackoff | (Duration initialBackoff) | Duration.standardSeconds(5) | Начальная продолжительность интервала ожидания (backoff) перед первой повторной попыткой. |
withInsertDistributedSync | (Boolean sync) | true | Если true, синхронизирует операции вставки для распределённых таблиц. |
withInsertQuorum | (Long quorum) | null | Количество реплик, необходимых для подтверждения операции вставки. |
withInsertDeduplicate | (Boolean deduplicate) | true | Если true, включена дедупликация для операций вставки. |
withTableSchema | (TableSchema schema) | null | Схема целевой таблицы ClickHouse. |
Ограничения
При использовании коннектора учитывайте следующие ограничения:
- На данный момент поддерживается только операция Sink. Коннектор не поддерживает операцию Source.
- ClickHouse выполняет дедупликацию при вставке в таблицу
ReplicatedMergeTreeили в таблицуDistributed, построенную поверхReplicatedMergeTree. Без репликации вставка в обычную MergeTree может привести к дубликатам, если вставка завершилась с ошибкой, а затем была успешно повторена. Однако каждый блок вставляется атомарно, и размер блока можно настроить с помощьюClickHouseIO.Write.withMaxInsertBlockSize(long). Дедупликация достигается за счёт использования контрольных сумм вставленных блоков. Дополнительные сведения о дедупликации см. в разделах Deduplication и Deduplicate insertion config. - Коннектор не выполняет никаких DDL-операций; следовательно, целевая таблица должна существовать до вставки.
Связанные материалы
- Документация по классу
ClickHouseIO(документация). - Репозиторий с примерами на GitHub: clickhouse-beam-connector.