Перейти к основному содержанию
Перейти к основному содержанию

Интеграция Apache Beam и ClickHouse

ClickHouse Supported

Apache Beam — это открытая унифицированная модель программирования, которая позволяет разработчикам определять и выполнять как пакетные, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam заключается в его способности поддерживать широкий спектр сценариев обработки данных — от ETL (Extract, Transform, Load — извлечение, преобразование и загрузка) операций до сложной обработки событий и аналитики в режиме реального времени.
Эта интеграция использует официальный JDBC-коннектор ClickHouse в качестве базового слоя записи данных.

Пакет интеграции

Пакет, необходимый для интеграции Apache Beam и ClickHouse, поддерживается и развивается в разделе Apache Beam I/O Connectors — пакете интеграций с популярными системами хранения данных и базами данных. Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO находится в репозитории Apache Beam.

Настройка пакета ClickHouse для Apache Beam

Установка пакета

Добавьте следующую зависимость в используемую систему управления пакетами:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-clickhouse</artifactId>
    <version>${beam.version}</version>
</dependency>
Рекомендуемая версия Beam

Коннектор ClickHouseIO рекомендуется использовать, начиная с Apache Beam версии 2.59.0. Более ранние версии могут не в полной мере поддерживать функциональность коннектора.

Артефакты можно найти в официальном репозитории Maven.

Пример кода

Следующий пример считывает CSV‑файл с именем input.csv как коллекцию PCollection, преобразует его в объект Row (используя определённую схему) и вставляет в локальный экземпляр ClickHouse с помощью ClickHouseIO:


package org.example;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.clickhouse.ClickHouseIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;

public class Main {

    public static void main(String[] args) {
        // Создайте объект Pipeline.
        Pipeline p = Pipeline.create();

        Schema SCHEMA =
                Schema.builder()
                        .addField(Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true))
                        .addField(Schema.Field.of("age", Schema.FieldType.INT16).withNullable(true))
                        .addField(Schema.Field.of("insertion_time", Schema.FieldType.DATETIME).withNullable(false))
                        .build();

        // Примените преобразования к конвейеру.
        PCollection<String> lines = p.apply("Чтение строк", TextIO.read().from("src/main/resources/input.csv"));

        PCollection<Row> rows = lines.apply("Преобразование в Row", ParDo.of(new DoFn<String, Row>() {
            @ProcessElement
            public void processElement(@Element String line, OutputReceiver<Row> out) {

                String[] values = line.split(",");
                Row row = Row.withSchema(SCHEMA)
                        .addValues(values[0], Short.parseShort(values[1]), DateTime.now())
                        .build();
                out.output(row);
            }
        })).setRowSchema(SCHEMA);

        rows.apply("Запись в ClickHouse",
                        ClickHouseIO.write("jdbc:clickhouse://localhost:8123/default?user=default&password=******", "test_table"));

        // Запустите конвейер.
        p.run().waitUntilFinish();
    }
}

Поддерживаемые типы данных

ClickHouseApache BeamПоддерживаетсяПримечания
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytes — это LogicalType, представляющий байтовый массив фиксированной длины,
определённый в пакете
org.apache.beam.sdk.schemas.logicaltypes
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

Параметры ClickHouseIO.Write

Вы можете настроить конфигурацию ClickHouseIO.Write с помощью следующих функций-сеттеров:

Функция-сеттер параметраТип аргументаЗначение по умолчаниюОписание
withMaxInsertBlockSize(long maxInsertBlockSize)1000000Максимальный размер блока строк для вставки.
withMaxRetries(int maxRetries)5Максимальное число повторных попыток для неуспешных вставок.
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)Максимальная суммарная продолжительность интервала ожидания (backoff) для повторных попыток.
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)Начальная продолжительность интервала ожидания (backoff) перед первой повторной попыткой.
withInsertDistributedSync(Boolean sync)trueЕсли true, синхронизирует операции вставки для распределённых таблиц.
withInsertQuorum(Long quorum)nullКоличество реплик, необходимых для подтверждения операции вставки.
withInsertDeduplicate(Boolean deduplicate)trueЕсли true, включена дедупликация для операций вставки.
withTableSchema(TableSchema schema)nullСхема целевой таблицы ClickHouse.

Ограничения

При использовании коннектора учитывайте следующие ограничения:

  • На данный момент поддерживается только операция Sink. Коннектор не поддерживает операцию Source.
  • ClickHouse выполняет дедупликацию при вставке в таблицу ReplicatedMergeTree или в таблицу Distributed, построенную поверх ReplicatedMergeTree. Без репликации вставка в обычную MergeTree может привести к дубликатам, если вставка завершилась с ошибкой, а затем была успешно повторена. Однако каждый блок вставляется атомарно, и размер блока можно настроить с помощью ClickHouseIO.Write.withMaxInsertBlockSize(long). Дедупликация достигается за счёт использования контрольных сумм вставленных блоков. Дополнительные сведения о дедупликации см. в разделах Deduplication и Deduplicate insertion config.
  • Коннектор не выполняет никаких DDL-операций; следовательно, целевая таблица должна существовать до вставки.