Табличный движок RabbitMQ
Этот движок позволяет интегрировать ClickHouse с RabbitMQ.
RabbitMQ позволяет:
- Публиковать или подписываться на потоки данных.
- Обрабатывать потоки по мере их поступления.
Создание таблицы
Обязательные параметры:
rabbitmq_host_port– host:port (например,localhost:5672).rabbitmq_exchange_name– имя обмена RabbitMQ.rabbitmq_format– формат сообщений. Использует ту же нотацию, что и SQL-функцияFORMAT, напримерJSONEachRow. Дополнительные сведения см. в разделе Форматы.
Необязательные параметры:
-
rabbitmq_exchange_type– Тип обменника RabbitMQ:direct,fanout,topic,headers,consistent_hash. По умолчанию:fanout. -
rabbitmq_routing_key_list– Список ключей маршрутизации (routing keys), разделённых запятыми. -
rabbitmq_schema– Параметр, который необходимо использовать, если формат требует определения схемы. Например, Cap'n Proto требует указать путь к файлу схемы и имя корневого объектаschema.capnp:Message. -
rabbitmq_num_consumers– Количество consumers на таблицу. Укажите большее число consumers, если пропускной способности одного недостаточно. По умолчанию:1. -
rabbitmq_num_queues– Общее количество очередей. Увеличение этого числа может значительно повысить производительность. По умолчанию:1. -
rabbitmq_queue_base- Укажите префикс для имён очередей. Сценарии использования этого параметра описаны ниже. -
rabbitmq_deadletter_exchange- Укажите имя для dead letter exchange. Вы можете создать другую таблицу с этим именем exchange и собирать сообщения в случаях, когда они повторно публикуются в dead letter exchange. По умолчанию dead letter exchange не задан. -
rabbitmq_persistent- Если установлено в 1 (true), в запросе INSERT режим доставки будет установлен в 2 (помечает сообщения какpersistent). По умолчанию:0. -
rabbitmq_skip_broken_messages– Допустимое количество сообщений RabbitMQ, несовместимых со схемой, в одном блоке при разборе. Еслиrabbitmq_skip_broken_messages = N, то движок пропускает N сообщений RabbitMQ, которые не удаётся разобрать (одно сообщение соответствует одной строке данных). По умолчанию:0. -
rabbitmq_max_block_size- Количество строк, собираемых перед сбросом данных из RabbitMQ. По умолчанию: max_insert_block_size. -
rabbitmq_flush_interval_ms- Таймаут для сброса данных из RabbitMQ. По умолчанию: stream_flush_interval_ms. -
rabbitmq_queue_settings_list- Позволяет задать настройки RabbitMQ при создании очереди. Доступные настройки:x-max-length,x-max-length-bytes,x-message-ttl,x-expires,x-priority,x-max-priority,x-overflow,x-dead-letter-exchange,x-queue-type. Параметрdurableдля очереди включается автоматически. -
rabbitmq_address- Адрес для подключения. Используйте либо этот параметр, либоrabbitmq_host_port. -
rabbitmq_vhost- RabbitMQ vhost. По умолчанию:'\. -
rabbitmq_queue_consume- Использовать заранее созданные (пользовательские) очереди и не выполнять никакой конфигурации RabbitMQ: объявление exchanges, очередей, связей (bindings). По умолчанию:false. -
rabbitmq_username- Имя пользователя RabbitMQ. -
rabbitmq_password- Пароль RabbitMQ. -
reject_unhandled_messages- Отклонять сообщения (отправлять в RabbitMQ отрицательное подтверждение) в случае ошибок. Этот параметр автоматически включается, если заданx-dead-letter-exchangeвrabbitmq_queue_settings_list. -
rabbitmq_commit_on_select- Фиксировать сообщения при выполнении запроса SELECT. По умолчанию:false. -
rabbitmq_max_rows_per_message— Максимальное количество строк, записываемых в одно сообщение RabbitMQ для построчных форматов. По умолчанию:1. -
rabbitmq_empty_queue_backoff_start— Начальная точка backoff для переназначения чтения, если очередь RabbitMQ пуста. -
rabbitmq_empty_queue_backoff_end— Конечная точка backoff для переназначения чтения, если очередь RabbitMQ пуста. -
rabbitmq_handle_error_mode— Способ обработки ошибок для движка RabbitMQ. Возможные значения: default (будет выброшено исключение, если не удаётся разобрать сообщение), stream (текст исключения и исходное сообщение будут сохранены во виртуальных столбцах_errorи_raw_message), dead_letter_queue (данные, связанные с ошибкой, будут сохранены в system.dead_letter_queue).- SSL connection:
Используйте либо rabbitmq_secure = 1, либо amqps в адресе подключения: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'.
Поведение используемой библиотеки по умолчанию — не проверять, является ли созданное TLS‑подключение достаточно безопасным. Независимо от того, истёк ли срок действия сертификата, он самоподписанный, отсутствует или недействителен, соединение просто разрешается. Более строгая проверка сертификатов может быть реализована в будущем.
Также к настройкам, связанным с RabbitMQ, могут быть добавлены параметры формата.
Пример:
Конфигурацию сервера RabbitMQ следует добавить в конфигурационный файл ClickHouse.
Требуемая конфигурация:
Дополнительная настройка:
Описание
SELECT не особенно полезен для чтения сообщений (кроме отладки), потому что каждое сообщение можно прочитать только один раз. Гораздо практичнее создавать потоки в реальном времени с помощью материализованных представлений. Для этого:
- Используйте движок, чтобы создать потребителя RabbitMQ и рассматривать его как поток данных.
- Создайте таблицу с требуемой структурой.
- Создайте материализованное представление, которое преобразует данные из движка и помещает их в ранее созданную таблицу.
Когда MATERIALIZED VIEW связывается с движком, оно начинает собирать данные в фоновом режиме. Это позволяет непрерывно получать сообщения из RabbitMQ и конвертировать их в требуемый формат с помощью SELECT.
Одна таблица RabbitMQ может иметь любое количество материализованных представлений.
Данные могут направляться на основе rabbitmq_exchange_type и указанного rabbitmq_routing_key_list.
В одной таблице может быть не более одного exchange. Один exchange может использоваться несколькими таблицами — это позволяет выполнять маршрутизацию в несколько таблиц одновременно.
Варианты типа exchange:
direct— маршрутизация основана на точном совпадении ключей. Пример списка ключей таблицы:key1,key2,key3,key4,key5, ключ сообщения может быть равен любому из них.fanout— маршрутизация во все таблицы (в которых имя exchange совпадает) независимо от ключей.topic— маршрутизация основана на шаблонах с ключами, разделёнными точкой. Примеры:*.logs,records.*.*.2020,*.2018,*.2019,*.2020.headers— маршрутизация основана на совпаденияхkey=valueс параметромx-match=allилиx-match=any. Пример списка ключей таблицы:x-match=all,format=logs,type=report,year=2020.consistent_hash— данные равномерно распределяются между всеми привязанными таблицами (в которых имя exchange совпадает). Обратите внимание, что этот тип exchange должен быть включён с помощью плагина RabbitMQ:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
Настройка rabbitmq_queue_base может использоваться в следующих случаях:
- чтобы позволить разным таблицам разделять очереди, так что для одних и тех же очередей может быть зарегистрировано несколько потребителей, что повышает производительность. Если используются настройки
rabbitmq_num_consumersи/илиrabbitmq_num_queues, то точное совпадение очередей достигается в случае, когда эти параметры одинаковы. - чтобы иметь возможность восстановить чтение из определённых устойчивых (durable) очередей, когда не все сообщения были успешно потреблены. Чтобы возобновить потребление из одной конкретной очереди — укажите её имя в настройке
rabbitmq_queue_baseи не задавайтеrabbitmq_num_consumersиrabbitmq_num_queues(по умолчанию 1). Чтобы возобновить потребление из всех очередей, которые были объявлены для конкретной таблицы, просто укажите те же настройки:rabbitmq_queue_base,rabbitmq_num_consumers,rabbitmq_num_queues. По умолчанию имена очередей будут уникальны для таблиц. - чтобы повторно использовать очереди, так как они объявлены как durable и не удаляются автоматически. (Могут быть удалены с помощью любых CLI‑инструментов RabbitMQ.)
Для повышения производительности полученные сообщения группируются в блоки размером max_insert_block_size. Если блок не был сформирован в течение stream_flush_interval_ms миллисекунд, данные будут записаны в таблицу независимо от полноты блока.
Если настройки rabbitmq_num_consumers и/или rabbitmq_num_queues указаны вместе с rabbitmq_exchange_type, то:
- должен быть включён плагин
rabbitmq-consistent-hash-exchange; - должно быть указано свойство
message_idпубликуемых сообщений (уникальное для каждого сообщения/пакета).
Для INSERT‑запроса доступна метаинформация сообщения, которая добавляется для каждого опубликованного сообщения: messageID и флаг republished (true, если сообщение было опубликовано более одного раза) — они доступны через заголовки сообщения.
Не используйте одну и ту же таблицу для вставок и материализованных представлений.
Пример:
Виртуальные столбцы
_exchange_name— имя exchange в RabbitMQ. Тип данных:String._channel_id— идентификатор канала (ChannelID), на котором был объявлен consumer, получивший сообщение. Тип данных:String._delivery_tag— DeliveryTag полученного сообщения. Область действия — один канал. Тип данных:UInt64._redelivered— флагredeliveredсообщения. Тип данных:UInt8._message_id— идентификатор сообщения (messageID) полученного сообщения; непустой, если был установлен при публикации сообщения. Тип данных:String._timestamp— временная метка (timestamp) полученного сообщения; непустая, если была установлена при публикации сообщения. Тип данных:UInt64.
Дополнительные виртуальные столбцы при rabbitmq_handle_error_mode='stream':
_raw_message— исходное сообщение, которое не удалось успешно разобрать. Тип данных:Nullable(String)._error— текст исключения, возникшего при ошибке разбора. Тип данных:Nullable(String).
Примечание: виртуальные столбцы _raw_message и _error заполняются только в случае возникновения исключения во время разбора; при успешном разборе сообщения они всегда равны NULL.
Ограничения
Даже если вы укажете выражения значений по умолчанию для столбцов (такие как DEFAULT, MATERIALIZED, ALIAS) в определении таблицы, они будут игнорироваться. Вместо этого столбцы будут заполняться значениями по умолчанию для соответствующих типов.
Поддержка форматов данных
Движок RabbitMQ поддерживает все форматы, которые поддерживаются в ClickHouse. Количество строк в одном сообщении RabbitMQ зависит от того, является ли формат построчным или блочным:
- Для построчных форматов количество строк в одном сообщении RabbitMQ можно контролировать с помощью настройки
rabbitmq_max_rows_per_message. - Для блочных форматов нельзя разделить блок на более мелкие части, но количество строк в одном блоке можно контролировать глобальной настройкой max_block_size.