Подключение Apache NiFi к ClickHouse
Apache NiFi
— это программное обеспечение с открытым исходным кодом для управления рабочими процессами, предназначенное для автоматизации потоков данных между программными системами. Оно позволяет создавать конвейеры данных ETL и поставляется с более чем 300 процессорами данных. Это пошаговое руководство показывает, как подключить Apache NiFi к ClickHouse в качестве источника и приёмника данных, а также загрузить тестовый набор данных.
Соберите сведения о подключении
Чтобы подключиться к ClickHouse по HTTP(S), вам потребуется следующая информация:
| Параметр(ы) | Описание |
|---|---|
HOST и PORT | Обычно используется порт 8443 при использовании TLS или 8123 при отсутствии TLS. |
DATABASE NAME | По умолчанию существует база данных default; используйте имя базы данных, к которой вы хотите подключиться. |
USERNAME и PASSWORD | По умолчанию имя пользователя — default. Используйте имя пользователя, соответствующее вашему сценарию. |
Сведения о вашем сервисе ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите сервис и нажмите Connect:

Выберите HTTPS. Параметры подключения отображаются в примере команды curl.

Если вы используете самостоятельное (self-managed) развертывание ClickHouse, параметры подключения задаются администратором ClickHouse.
Загрузите и запустите Apache NiFi
Для нового развертывания скачайте двоичный файл с https://nifi.apache.org/download.html и запустите NiFi командой ./bin/nifi.sh start
Загрузите драйвер ClickHouse JDBC
- Перейдите на страницу релизов драйвера ClickHouse JDBC на GitHub и найдите последнюю версию JDBC-драйвера
- В выбранной версии релиза нажмите «Show all xx assets» и найдите JAR-файл, содержащий ключевое слово
shadedилиall, напримерclickhouse-jdbc-0.5.0-all.jar - Поместите JAR-файл в каталог, доступный Apache NiFi, и запомните абсолютный путь к нему
Добавьте службу контроллера DBCPConnectionPool и настройте её свойства
-
Чтобы настроить Controller Service в Apache NiFi, перейдите на страницу NiFi Flow Configuration, нажав кнопку с иконкой шестерёнки

-
Выберите вкладку Controller Services и добавьте новый Controller Service, нажав кнопку
+в правом верхнем углу
-
Найдите
DBCPConnectionPoolи нажмите кнопку Add
-
Только что добавленная служба контроллера
DBCPConnectionPoolпо умолчанию будет находиться в состоянии Invalid. Нажмите кнопку с иконкой шестерёнки, чтобы начать настройку
-
В разделе Properties введите следующие значения
| Property | Value | Remark |
|---|---|---|
| Database Connection URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | Замените HOSTNAME в URL подключения соответствующим значением |
| Database Driver Class Name | com.clickhouse.jdbc.ClickHouseDriver | |
| Database Driver Location(s) | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | Абсолютный путь к JAR-файлу JDBC-драйвера ClickHouse |
| Database User | default | Имя пользователя ClickHouse |
| Password | password | Пароль ClickHouse |
-
В разделе Settings измените имя Controller Service на ClickHouse JDBC для удобства

-
Активируйте Controller Service
DBCPConnectionPool, нажав кнопку с иконкой молнии, а затем кнопку Enable

-
Проверьте вкладку Controller Services и убедитесь, что Controller Service включён

Чтение из таблицы с помощью процессора ExecuteSQL
-
Добавьте процессор
ExecuteSQLвместе с соответствующими входящими и последующими процессорами
-
В разделе "Properties" процессора
ExecuteSQLзадайте следующие значенияProperty Value Remark Database Connection Pooling Service ClickHouse JDBC Выберите Controller Service, настроенный для ClickHouse SQL select query SELECT * FROM system.metrics Введите здесь свой запрос -
Запустите процессор
ExecuteSQL
-
Чтобы убедиться, что запрос был успешно обработан, изучите один из
FlowFileв выходной очереди
-
Переключите представление в режим "formatted", чтобы просмотреть результат выходного
FlowFile
Запись в таблицу с использованием процессоров MergeRecord и PutDatabaseRecord
-
Для записи нескольких строк в одной операции вставки необходимо сначала объединить несколько записей в одну. Это можно сделать с помощью процессора
MergeRecord -
В разделе «Properties» процессора
MergeRecordвведите следующие значенияProperty Value Remark Record Reader JSONTreeReaderВыберите соответствующий читатель записей Record Writer JSONReadSetWriterВыберите соответствующий писатель записей Minimum Number of Records 1000 Измените это значение на большее, чтобы минимальное количество строк объединялось в одну запись. По умолчанию — 1 строка Maximum Number of Records 10000 Измените это значение на большее, чем «Minimum Number of Records». По умолчанию — 1000 строк -
Чтобы убедиться, что несколько записей объединены в одну, проверьте входные и выходные данные процессора
MergeRecord. Обратите внимание, что выходные данные представляют собой массив из нескольких входных записейВходные данные

Выходные данные

-
В разделе «Properties» процессора
PutDatabaseRecordвведите следующие значенияProperty Value Remark Record Reader JSONTreeReaderВыберите соответствующий читатель записей Database Type Generic Оставьте значение по умолчанию Statement Type INSERT Database Connection Pooling Service ClickHouse JDBC Выберите сервис контроллера ClickHouse Table Name tbl Введите здесь имя вашей таблицы Translate Field Names false Установите значение «false», чтобы вставляемые имена полей совпадали с именами столбцов Maximum Batch Size 1000 Максимальное количество строк на одну вставку. Это значение не должно быть меньше значения «Minimum Number of Records» в процессоре MergeRecord -
Чтобы убедиться, что каждая вставка содержит несколько строк, проверьте, что количество строк в таблице увеличивается как минимум на значение «Minimum Number of Records», определенное в
MergeRecord.
-
Поздравляем — вы успешно загрузили данные в ClickHouse с помощью Apache NiFi!