Перейти к основному содержанию
Перейти к основному содержанию

Подключение Apache NiFi к ClickHouse

Community Maintained

Apache NiFi

— это программное обеспечение с открытым исходным кодом для управления рабочими процессами, предназначенное для автоматизации потоков данных между программными системами. Оно позволяет создавать конвейеры данных ETL и поставляется с более чем 300 процессорами данных. Это пошаговое руководство показывает, как подключить Apache NiFi к ClickHouse в качестве источника и приёмника данных, а также загрузить тестовый набор данных.

Соберите сведения о подключении

Чтобы подключиться к ClickHouse по HTTP(S), вам потребуется следующая информация:

Параметр(ы)Описание
HOST и PORTОбычно используется порт 8443 при использовании TLS или 8123 при отсутствии TLS.
DATABASE NAMEПо умолчанию существует база данных default; используйте имя базы данных, к которой вы хотите подключиться.
USERNAME и PASSWORDПо умолчанию имя пользователя — default. Используйте имя пользователя, соответствующее вашему сценарию.

Сведения о вашем сервисе ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите сервис и нажмите Connect:

Кнопка подключения сервиса ClickHouse Cloud

Выберите HTTPS. Параметры подключения отображаются в примере команды curl.

Параметры HTTPS-подключения ClickHouse Cloud

Если вы используете самостоятельное (self-managed) развертывание ClickHouse, параметры подключения задаются администратором ClickHouse.

Загрузите и запустите Apache NiFi

Для нового развертывания скачайте двоичный файл с https://nifi.apache.org/download.html и запустите NiFi командой ./bin/nifi.sh start

Загрузите драйвер ClickHouse JDBC

  1. Перейдите на страницу релизов драйвера ClickHouse JDBC на GitHub и найдите последнюю версию JDBC-драйвера
  2. В выбранной версии релиза нажмите «Show all xx assets» и найдите JAR-файл, содержащий ключевое слово shaded или all, например clickhouse-jdbc-0.5.0-all.jar
  3. Поместите JAR-файл в каталог, доступный Apache NiFi, и запомните абсолютный путь к нему

Добавьте службу контроллера DBCPConnectionPool и настройте её свойства

  1. Чтобы настроить Controller Service в Apache NiFi, перейдите на страницу NiFi Flow Configuration, нажав кнопку с иконкой шестерёнки

    С�траница NiFi Flow Configuration с выделенной кнопкой с иконкой шестерёнки
  2. Выберите вкладку Controller Services и добавьте новый Controller Service, нажав кнопку + в правом верхнем углу

    Вкладка Controller Services с выделенной кнопкой добавления
  3. Найдите DBCPConnectionPool и нажмите кнопку Add

    Диалог выбора Controller Service с выделенным DBCPConnectionPool
  4. Только что добавленная служба контроллера DBCPConnectionPool по умолчанию будет находиться в состоянии Invalid. Нажмите кнопку с иконкой шестерёнки, чтобы начать настройку

    Список Controller Services с DBCPConnectionPool в состоянии Invalid и выделенной кнопкой с иконкой шестерёнки
  5. В разделе Properties введите следующие значения

PropertyValueRemark
Database Connection URLjdbc:ch:https://HOSTNAME:8443/default?ssl=trueЗамените HOSTNAME в URL подключения соответствующим значением
Database Driver Class Namecom.clickhouse.jdbc.ClickHouseDriver
Database Driver Location(s)/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarАбсолютный путь к JAR-файлу JDBC-драйвера ClickHouse
Database UserdefaultИмя пользователя ClickHouse
PasswordpasswordПароль ClickHouse
  1. В разделе Settings измените имя Controller Service на ClickHouse JDBC для удобства

    Диалог настройки DBCPConnectionPool с заполненными свойствами
  2. Активируйте Controller Service DBCPConnectionPool, нажав кнопку с иконкой молнии, а затем кнопку Enable

    Список Controller Services с выделенной кнопкой с иконкой молнии

    Диалог подтверждения включения (Enable) Controller Service
  3. Проверьте вкладку Controller Services и убедитесь, что Controller Service включён

    Список Controller Services с включённым сервисом ClickHouse JDBC

Чтение из таблицы с помощью процессора ExecuteSQL

  1. Добавьте процессор ExecuteSQL вместе с соответствующими входящими и последующими процессорами

    Рабочая область NiFi с процессором ExecuteSQL в составе workflow
  2. В разделе "Properties" процессора ExecuteSQL задайте следующие значения

    PropertyValueRemark
    Database Connection Pooling ServiceClickHouse JDBCВыберите Controller Service, настроенный для ClickHouse
    SQL select querySELECT * FROM system.metricsВведите здесь свой запрос
  3. Запустите процессор ExecuteSQL

    Конфигурация процессора ExecuteSQL с заполненными свойствами
  4. Чтобы убедиться, что запрос был успешно обработан, изучите один из FlowFile в выходной очереди

    Диалоговое окно очереди, показывающее FlowFile, готовые к анализу
  5. Переключите представление в режим "formatted", чтобы просмотреть результат выходного FlowFile

    Просмотрщик содержимого FlowFile, показывающий результаты запроса в отформатированном виде

Запись в таблицу с использованием процессоров MergeRecord и PutDatabaseRecord

  1. Для записи нескольких строк в одной операции вставки необходимо сначала объединить несколько записей в одну. Это можно сделать с помощью процессора MergeRecord

  2. В разделе «Properties» процессора MergeRecord введите следующие значения

    PropertyValueRemark
    Record ReaderJSONTreeReaderВыберите соответствующий читатель записей
    Record WriterJSONReadSetWriterВыберите соответствующий писатель записей
    Minimum Number of Records1000Измените это значение на большее, чтобы минимальное количество строк объединялось в одну запись. По умолчанию — 1 строка
    Maximum Number of Records10000Измените это значение на большее, чем «Minimum Number of Records». По умолчанию — 1000 строк
  3. Чтобы убедиться, что несколько записей объединены в одну, проверьте входные и выходные данные процессора MergeRecord. Обратите внимание, что выходные данные представляют собой массив из нескольких входных записей

    Входные данные

    Входные данные процессора MergeRecord, показывающие отдельные записи

    Выходные данные

    Выходные данные процессора MergeRecord, показывающие объединенный массив записей
  4. В разделе «Properties» процессора PutDatabaseRecord введите следующие значения

    PropertyValueRemark
    Record ReaderJSONTreeReaderВыберите соответствующий читатель записей
    Database TypeGenericОставьте значение по умолчанию
    Statement TypeINSERT
    Database Connection Pooling ServiceClickHouse JDBCВыберите сервис контроллера ClickHouse
    Table NametblВведите здесь имя вашей таблицы
    Translate Field NamesfalseУстановите значение «false», чтобы вставляемые имена полей совпадали с именами столбцов
    Maximum Batch Size1000Максимальное количество строк на одну вставку. Это значение не должно быть меньше значения «Minimum Number of Records» в процессоре MergeRecord
  5. Чтобы убедиться, что каждая вставка содержит несколько строк, проверьте, что количество строк в таблице увеличивается как минимум на значение «Minimum Number of Records», определенное в MergeRecord.

    Результаты запроса, показывающие количество строк в целевой таблице
  6. Поздравляем — вы успешно загрузили данные в ClickHouse с помощью Apache NiFi!