Перейти к основному содержанию
Перейти к основному содержанию

Дозагрузка данных

Независимо от того, являетесь ли вы новым пользователем ClickHouse или отвечаете за существующее развертывание, рано или поздно вам потребуется дозагрузить в таблицы исторические данные. В некоторых случаях это относительно просто, но может становиться сложнее, когда нужно заполнить материализованные представления. В этом руководстве описаны некоторые подходы к решению этой задачи, которые пользователи могут адаптировать под свои сценарии.

Примечание

В этом руководстве предполагается, что пользователи уже знакомы с концепцией инкрементных материализованных представлений и загрузки данных с использованием табличных функций, таких как S3 и GCS. Мы также рекомендуем ознакомиться с нашим руководством по оптимизации производительности вставки из объектного хранилища, рекомендации из которого можно применять к операциям вставки во всех примерах этого руководства.

Пример набора данных

Во всём этом руководстве мы используем набор данных PyPI. Каждая строка в этом наборе данных представляет загрузку Python‑пакета с использованием такого инструмента, как pip.

Например, этот поднабор охватывает один день — 2024-12-17 — и доступен в открытом доступе по адресу https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/. Пользователи могут выполнять запросы с помощью:

SELECT count()
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/*.parquet')

┌────count()─┐
│ 2039988137 │ -- 2,04 миллиарда
└────────────┘

1 row in set. Elapsed: 32.726 sec. Processed 2.04 billion rows, 170.05 KB (62.34 million rows/s., 5.20 KB/s.)
Peak memory usage: 239.50 MiB.

Полный набор данных для этого бакета содержит более 320 ГБ файлов Parquet. В примерах ниже мы намеренно выбираем подмножества с помощью glob-шаблонов.

Мы предполагаем, что пользователь получает поток этих данных, например из Kafka или объектного хранилища, начиная с этой даты. Схема этих данных показана ниже.

DESCRIBE TABLE s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/*.parquet')
FORMAT PrettyCompactNoEscapesMonoBlock
SETTINGS describe_compact_output = 1

┌─name───────────────┬─type────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ timestamp │ Nullable(DateTime64(6))                                                                                                                 │
│ country_code       │ Nullable(String)                                                                                                                        │
│ url │ Nullable(String)                                                                                                                        │
│ project            │ Nullable(String)                                                                                                                        │
│ file │ Tuple(filename Nullable(String), project Nullable(String), version Nullable(String), type Nullable(String))                             │
│ installer          │ Tuple(name Nullable(String), version Nullable(String))                                                                                  │
│ python             │ Nullable(String)                                                                                                                        │
│ implementation     │ Tuple(name Nullable(String), version Nullable(String))                                                                                  │
│ distro             │ Tuple(name Nullable(String), version Nullable(String), id Nullable(String), libc Tuple(lib Nullable(String), version Nullable(String))) │
│ system │ Tuple(name Nullable(String), release Nullable(String))                                                                                  │
│ cpu                │ Nullable(String)                                                                                                                        │
│ openssl_version    │ Nullable(String)                                                                                                                        │
│ setuptools_version │ Nullable(String)                                                                                                                        │
│ rustc_version      │ Nullable(String)                                                                                                                        │
│ tls_protocol       │ Nullable(String)                                                                                                                        │
│ tls_cipher         │ Nullable(String)                                                                                                                        │
└────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Примечание

Полный набор данных PyPI, состоящий более чем из 1 триллиона строк, доступен в нашем публичном демонстрационном окружении clickpy.clickhouse.com. Дополнительные сведения об этом наборе данных, включая то, как демо использует материализованные представления для повышения производительности и как данные ежедневно загружаются, см. здесь.

Сценарии дозаполнения данных задним числом

Дозаполнение исторических данных обычно требуется, когда потребление потока данных начинается с определённого момента времени. Эти данные вставляются в таблицы ClickHouse с помощью инкрементальных материализованных представлений, которые срабатывают на блоки данных по мере их вставки. Эти представления могут трансформировать данные перед вставкой или вычислять агрегаты и отправлять результаты в целевые таблицы для последующего использования в нижестоящих приложениях.

Мы рассмотрим следующие сценарии:

  1. Дозаполнение исторических данных при существующей ингестии данных — загружаются новые данные, и необходимо дозаполнить исторические данные. Эти исторические данные уже определены.
  2. Добавление материализованных представлений к существующим таблицам — необходимо добавить новые материализованные представления к конфигурации, для которой исторические данные уже загружены и поток данных уже идёт.

Мы предполагаем, что исторические данные будут дозаполняться из объектного хранилища. Во всех случаях мы стремимся избежать пауз во вставке данных.

Мы рекомендуем дозаполнять исторические данные из объектного хранилища. Данные по возможности следует экспортировать в формат Parquet для оптимальной производительности чтения и сжатия (уменьшения сетевого трафика). Размер файла порядка 150 МБ обычно является предпочтительным, однако ClickHouse поддерживает более 70 форматов файлов и может обрабатывать файлы любого размера.

Использование дублирующих таблиц и представлений

Во всех сценариях мы опираемся на концепцию «дублирующих таблиц и представлений». Эти таблицы и представления являются копиями тех, что используются для потоковых данных в реальном времени, и позволяют выполнять backfill (дозагрузку данных задним числом) изолированно, с простым механизмом восстановления в случае сбоя. Например, у нас есть следующая основная таблица pypi и материализованное представление, которое вычисляет количество загрузок для каждого Python‑проекта:

CREATE TABLE pypi
(
    `timestamp` DateTime,
    `country_code` LowCardinality(String),
    `project` String,
    `type` LowCardinality(String),
    `installer` LowCardinality(String),
    `python_minor` LowCardinality(String),
    `system` LowCardinality(String),
    `on` String
)
ENGINE = MergeTree
ORDER BY (project, timestamp)

CREATE TABLE pypi_downloads
(
    `project` String,
    `count` Int64
)
ENGINE = SummingMergeTree
ORDER BY project

CREATE MATERIALIZED VIEW pypi_downloads_mv TO pypi_downloads
AS SELECT
 project,
    count() AS count
FROM pypi
GROUP BY project

Мы заполняем основную таблицу и связанное представление подмножеством данных:

INSERT INTO pypi SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{000..100}.parquet')

0 строк в наборе. Время выполнения: 15.702 сек. Обработано 41.23 млн строк, 3.94 ГБ (2.63 млн строк/с., 251.01 МБ/с.)
Пиковое использование памяти: 977.49 МиБ.

SELECT count() FROM pypi

┌──count()─┐
│ 20612750 │ -- 20,61 млн
└──────────┘

1 строка в наборе. Время выполнения: 0.004 сек.

SELECT sum(count)
FROM pypi_downloads

┌─sum(count)─┐
│   20612750 │ -- 20,61 млн
└────────────┘

1 строка в наборе. Время выполнения: 0.006 сек. Обработано 96.15 тыс. строк, 769.23 КБ (16.53 млн строк/с., 132.26 МБ/с.)
Пиковое использование памяти: 682.38 КиБ.

Предположим, мы хотим загрузить другое подмножество {101..200}. Хотя мы могли бы вставлять данные напрямую в pypi, мы можем выполнить этот бэкфилл изолированно, создав дублирующие таблицы.

Если этот бэкфилл завершится неудачно, мы не затронем наши основные таблицы и сможем просто очистить дублирующие таблицы и повторить попытку.

Чтобы создать новые копии этих представлений, мы можем использовать конструкцию CREATE TABLE AS с суффиксом _v2:

CREATE TABLE pypi_v2 AS pypi

CREATE TABLE pypi_downloads_v2 AS pypi_downloads

CREATE MATERIALIZED VIEW pypi_downloads_mv_v2 TO pypi_downloads_v2
AS SELECT
 project,
    count() AS count
FROM pypi_v2
GROUP BY project

Мы заполняем его вторым подмножеством примерно такого же размера и убеждаемся, что загрузка прошла успешно.

INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{101..200}.parquet')

0 строк в наборе. Прошло: 17,545 сек. Обработано 40,80 млн строк, 3,90 ГБ (2,33 млн строк/сек., 222,29 МБ/сек.)
Пиковое потребление памяти: 991,50 МиБ.

SELECT count()
FROM pypi_v2

┌──count()─┐
│ 20400020 │ -- 20,40 миллиона
└──────────┘

1 строка в наборе. Прошло: 0,004 сек.

SELECT sum(count)
FROM pypi_downloads_v2

┌─sum(count)─┐
│   20400020 │ -- 20,40 миллиона
└────────────┘

1 строка в наборе. Прошло: 0,006 сек. Обработано 95,49 тыс. строк, 763,90 КБ (14,81 млн строк/сек., 118,45 МБ/сек.)
Пиковое потребление памяти: 688,77 КиБ.

Если бы мы столкнулись с ошибкой на любом этапе второй загрузки, мы могли бы просто очистить таблицы pypi_v2 и pypi_downloads_v2 и повторить загрузку данных.

После завершения загрузки данных мы можем переместить данные из дублирующих таблиц в основные таблицы с помощью оператора ALTER TABLE MOVE PARTITION.

ALTER TABLE pypi_v2 MOVE PARTITION () TO pypi

0 строк в наборе. Прошло: 1.401 сек.

ALTER TABLE pypi_downloads_v2 MOVE PARTITION () TO pypi_downloads

0 строк в наборе. Прошло: 0.389 сек.
Имена партиций

Приведённый выше вызов MOVE PARTITION использует имя партиции (). Оно соответствует единственной партиции для этой таблицы (которая не разбита на партиции). Для таблиц, которые разбиты на партиции, потребуется выполнить несколько вызовов MOVE PARTITION — по одному для каждой партиции. Имена текущих партиций можно получить из таблицы system.parts, например: SELECT DISTINCT partition FROM system.parts WHERE (table = 'pypi_v2').

Теперь мы можем убедиться, что pypi и pypi_downloads содержат все данные. Таблицы pypi_downloads_v2 и pypi_v2 можно безопасно удалить.

SELECT count()
FROM pypi

┌──count()─┐
│ 41012770 │ -- 41,01 миллиона
└──────────┘

1 строка в наборе. Время выполнения: 0.003 с.

SELECT sum(count)
FROM pypi_downloads

┌─sum(count)─┐
│   41012770 │ -- 41,01 миллиона
└────────────┘

1 строка в наборе. Время выполнения: 0.007 с. Обработано 191,64 тыс. строк, 1,53 МБ (27,34 млн строк/с, 218,74 МБ/с).

SELECT count()
FROM pypi_v2

Важно, что операция MOVE PARTITION одновременно лёгковесна (использует жёсткие ссылки) и атомарна, т.е. либо завершается успешно, либо с ошибкой, без промежуточного состояния.

Мы активно используем этот подход в описанных ниже сценариях дозагрузки данных (backfilling).

Обратите внимание, что этот процесс требует от пользователей выбора размера каждой операции вставки.

Более крупные вставки, т.е. больше строк, означают, что потребуется меньше операций MOVE PARTITION. Однако это необходимо сбалансировать с затратами на восстановление в случае сбоя вставки, например из‑за обрыва сети. Пользователи могут дополнить этот процесс пакетной обработкой файлов для снижения риска. Это можно выполнять либо с помощью диапазонных запросов, например WHERE timestamp BETWEEN 2024-12-17 09:00:00 AND 2024-12-17 10:00:00, либо с помощью glob-шаблонов. Например,

INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{101..200}.parquet')
INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{201..300}.parquet')
INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-000000000{301..400}.parquet')
--продолжается до загрузки всех файлов ИЛИ выполнения команды MOVE PARTITION
Примечание

ClickPipes использует этот подход при загрузке данных из объектного хранилища, автоматически создавая дубликаты целевой таблицы и её материализованных представлений и избавляя пользователя от необходимости выполнять описанные выше шаги. Дополнительно, за счёт использования нескольких рабочих потоков, каждый из которых обрабатывает свои подмножества данных (через glob-шаблоны) и использует собственные дубликаты таблиц, данные могут загружаться быстро с соблюдением семантики «ровно один раз». Заинтересованные читатели могут найти дополнительные подробности в этой статье блога.

Сценарий 1: Дозагрузка данных при существующей ингестии данных

В этом сценарии мы предполагаем, что данные для дозагрузки не находятся в отдельном бакете, поэтому требуется фильтрация. Данные уже вставляются, и можно определить метку времени или монотонно возрастающий столбец, начиная с которого необходимо выполнить дозагрузку исторических данных.

Этот процесс включает следующие шаги:

  1. Определить контрольную точку — либо метку времени, либо значение столбца, начиная с которого необходимо восстановить исторические данные.
  2. Создать дубликаты основной таблицы и целевых таблиц для материализованных представлений.
  3. Создать копии всех материализованных представлений, которые ссылаются на целевые таблицы, созданные на шаге (2).
  4. Вставить данные в дублирующую основную таблицу, созданную на шаге (2).
  5. Переместить все партиции из дублирующих таблиц в их исходные таблицы. Удалить дублирующие таблицы.

Например, в наших данных PyPI предположим, что данные уже загружены. Мы можем определить минимальную метку времени и, таким образом, нашу «контрольную точку».

SELECT min(timestamp)
FROM pypi

┌──────min(timestamp)─┐
│ 2024-12-17 09:00:00 │
└─────────────────────┘

Получена 1 строка. Время выполнения: 0.163 сек. Обработано 1.34 млрд строк, 5.37 ГБ (8.24 млрд строк/сек., 32.96 ГБ/сек.)
Пиковое потребление памяти: 227.84 МиБ.

Из сказанного выше мы знаем, что нам нужно загрузить данные с временными метками до 2024-12-17 09:00:00. Используя описанный ранее процесс, мы создаём дубликаты таблиц и представлений и загружаем подмножество данных с помощью фильтра по временной метке.

CREATE TABLE pypi_v2 AS pypi

CREATE TABLE pypi_downloads_v2 AS pypi_downloads

CREATE MATERIALIZED VIEW pypi_downloads_mv_v2 TO pypi_downloads_v2
AS SELECT project, count() AS count
FROM pypi_v2
GROUP BY project

INSERT INTO pypi_v2 SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/1734393600-*.parquet')
WHERE timestamp < '2024-12-17 09:00:00'

0 rows in set. Elapsed: 500.152 sec. Processed 2.74 billion rows, 364.40 GB (5.47 million rows/s., 728.59 MB/s.)
Примечание

Фильтрация по столбцам с временными метками в Parquet может быть очень эффективной. ClickHouse будет читать только столбец с временными метками, чтобы определить полные диапазоны данных для загрузки, минимизируя сетевой трафик. Индексы Parquet, такие как min-max, также могут использоваться движком выполнения запросов ClickHouse.

После завершения операции вставки мы можем переместить соответствующие партиции.

ALTER TABLE pypi_v2 MOVE PARTITION () TO pypi

ALTER TABLE pypi_downloads_v2 MOVE PARTITION () TO pypi_downloads

Если исторические данные находятся в отдельном bucket-е, описанный выше фильтр по времени не требуется. Если временного или монотонного столбца нет, изолируйте исторические данные.

Просто используйте ClickPipes в ClickHouse Cloud

Пользователям ClickHouse Cloud следует использовать ClickPipes для восстановления исторических резервных копий, если данные можно изолировать в отдельном bucket-е (и фильтр не требуется). Помимо параллельной загрузки несколькими воркерами, что уменьшает время загрузки, ClickPipes автоматизирует описанный выше процесс — создаёт дубликаты таблиц как для основной таблицы, так и для материализованных представлений.

Сценарий 2: Добавление материализованных представлений к существующим таблицам

Нередко возникает необходимость добавить новые материализованные представления в конфигурацию, для которой уже накоплен значительный объём данных и продолжается вставка. В этом случае полезен столбец с меткой времени или монотонно возрастающим значением, который можно использовать для идентификации точки в потоке, что позволяет избежать пауз в ингестии данных. В примерах ниже мы рассматриваем оба случая, отдавая предпочтение подходам, которые позволяют избежать пауз в ингестии.

Avoid POPULATE

Мы не рекомендуем использовать команду POPULATE для заполнения задним числом материализованных представлений, за исключением небольших наборов данных, для которых приём приостановлен. Этот оператор может пропускать строки, вставленные в исходную таблицу, если материализованное представление создаётся после завершения выполнения POPULATE. Кроме того, эта операция выполняется по всем данным и уязвима к прерываниям или ограничениям по памяти на больших наборах данных.

Наличие столбца с меткой времени или монотонно возрастающим значением

В этом случае мы рекомендуем включить в новое материализованное представление фильтр, который ограничивает строки теми, что больше некоторого произвольного значения в будущем. Затем материализованное представление можно заполнить задним числом, начиная с этой даты, используя исторические данные из основной таблицы. Подход к такому заполнению зависит от объёма данных и сложности соответствующего запроса.

Наш самый простой подход включает следующие шаги:

  1. Создать материализованное представление с фильтром, который учитывает только строки, большие некоторого произвольного момента времени в ближайшем будущем.
  2. Выполнить запрос INSERT INTO SELECT, который вставляет данные в целевую таблицу нашего материализованного представления, читая из исходной таблицы с использованием агрегирующего запроса представления.

Данный подход можно дополнительно улучшить, нацеливаясь на подмножества данных на шаге (2) и/или используя дубликат целевой таблицы для материализованного представления (прикрепляя партиции к исходной после завершения вставки) для упрощения восстановления после сбоев.

Рассмотрим следующее материализованное представление, которое рассчитывает самые популярные проекты за каждый час.

CREATE TABLE pypi_downloads_per_day
(
    `hour` DateTime,
    `project` String,
    `count` Int64
)
ENGINE = SummingMergeTree
ORDER BY (project, hour)

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi
GROUP BY
    hour,
 project

Хотя мы уже можем добавить целевую таблицу, перед добавлением материализованного представления мы изменяем выражение SELECT, чтобы включить фильтр, который учитывает только строки с временем больше некоторого произвольного момента в ближайшем будущем — в данном случае мы предполагаем, что 2024-12-17 09:00:00 наступит через несколько минут.

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) AS hour,
 project, count() AS count
FROM pypi WHERE timestamp >= '2024-12-17 09:00:00'
GROUP BY hour, project

После добавления этого представления мы можем задним числом дозагрузить все данные для материализованного представления до этого момента.

Самый простой способ сделать это — выполнить запрос материализованного представления по основной таблице с фильтром, который игнорирует недавно добавленные данные, вставив результаты в целевую таблицу нашего представления с помощью INSERT INTO SELECT. Например, для приведённого выше представления:

INSERT INTO pypi_downloads_per_day SELECT
 toStartOfHour(timestamp) AS hour,
 project,
    count() AS count
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
GROUP BY
    hour,
 project

Ok.

0 rows in set. Elapsed: 2.830 sec. Processed 798.89 million rows, 17.40 GB (282.28 million rows/s., 6.15 GB/s.)
Пик использования памяти: 543.71 MiB.
Примечание

В приведённом выше примере целевой таблицей служит SummingMergeTree. В этом случае мы можем просто использовать исходный запрос агрегации. Для более сложных сценариев, в которых используется AggregatingMergeTree, следует применять функции -State для агрегирования. Пример такого подхода можно найти здесь.

В нашем случае это относительно лёгкая агрегация, которая завершается менее чем за 3 секунды и использует менее 600MiB памяти. Для более сложных или длительных агрегаций пользователи могут сделать этот процесс более устойчивым, используя ранее описанный подход с дублирующей таблицей, то есть создать теневую целевую таблицу, например pypi_downloads_per_day_v2, выполнять вставку в неё и затем присоединять получившиеся партиции к pypi_downloads_per_day.

Часто запрос материализованного представления может быть более сложным (что нередко, иначе пользователи не стали бы использовать представление!) и потреблять значительные ресурсы. В более редких случаях ресурсов, необходимых для выполнения запроса, может не хватать мощности сервера. Это подчёркивает одно из преимуществ материализованных представлений в ClickHouse — они обновляются инкрементально и не обрабатывают весь набор данных за один проход!

В этом случае у пользователей есть несколько вариантов:

  1. Модифицировать запрос для дозаполнения по диапазонам, например WHERE timestamp BETWEEN 2024-12-17 08:00:00 AND 2024-12-17 09:00:00, WHERE timestamp BETWEEN 2024-12-17 07:00:00 AND 2024-12-17 08:00:00 и т. д.
  2. Использовать движок таблиц Null для заполнения материализованного представления. Это имитирует типичное инкрементальное наполнение материализованного представления, выполняя его запрос над блоками данных (настраиваемого размера).

Вариант (1) является самым простым подходом и часто достаточен. Мы не приводим примеры ради краткости.

Вариант (2) рассматривается подробнее ниже.

Использование движка таблиц Null для заполнения материализованных представлений

Движок таблиц Null предоставляет движок хранения, который не сохраняет данные (думайте о нём как о /dev/null в мире движков таблиц). Хотя это может казаться противоречивым, материализованные представления всё равно будут выполняться над данными, вставляемыми в этот движок таблиц. Это позволяет создавать материализованные представления без сохранения исходных данных — избегая I/O и связанного с этим хранения.

Важно, что любые материализованные представления, прикреплённые к этому движку таблиц, продолжают выполняться над блоками данных по мере их вставки, отправляя результаты в целевую таблицу. Размер этих блоков настраивается. Более крупные блоки потенциально могут быть более эффективными (и быстрее обрабатываться), но они потребляют больше ресурсов (в первую очередь памяти). Использование этого движка таблиц означает, что мы можем строить наше материализованное представление инкрементально, то есть по одному блоку за раз, избегая необходимости удерживать всю агрегацию в памяти.

Денормализация в ClickHouse

Рассмотрим следующий пример:

CREATE TABLE pypi_v2
(
    `timestamp` DateTime,
    `project` String
)
ENGINE = Null

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv_v2 TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi_v2
GROUP BY
    hour,
 project

Здесь мы создаем таблицу с движком Null pypi_v2, чтобы принимать строки, которые будут использованы для построения нашего материализованного представления. Обратите внимание, что мы ограничиваем схему только теми столбцами, которые нам нужны. Наше материализованное представление выполняет агрегацию по строкам, вставленным в эту таблицу (по одному блоку за раз), отправляя результаты в нашу целевую таблицу pypi_downloads_per_day.

Примечание

В качестве целевой таблицы мы используем здесь pypi_downloads_per_day. Для повышения отказоустойчивости пользователи могут создать дубликат таблицы pypi_downloads_per_day_v2 и использовать ее в качестве целевой таблицы представления, как показано в предыдущих примерах. По завершении вставки разделы в pypi_downloads_per_day_v2 могут, в свою очередь, быть перенесены в pypi_downloads_per_day. Это позволит выполнить восстановление в случае, если вставка завершится сбоем из‑за проблем с памятью или прерывания работы сервера, т.е. мы просто очищаем pypi_downloads_per_day_v2, настраиваем параметры и повторяем попытку.

Чтобы заполнить это материализованное представление, мы просто вставляем соответствующие данные для догрузки (backfill) в pypi_v2 из pypi.

INSERT INTO pypi_v2 SELECT timestamp, project FROM pypi WHERE timestamp < '2024-12-17 09:00:00'

0 строк в наборе. Прошло: 27.325 сек. Обработано 1.50 млрд строк, 33.48 ГБ (54.73 млн строк/сек., 1.23 ГБ/сек.)
Пиковое потребление памяти: 639.47 МиБ.

Обратите внимание, что здесь использование памяти составляет 639.47 MiB.

Настройка производительности и ресурсов

На производительность и потребление ресурсов в описанном выше сценарии влияет несколько факторов. Перед тем как приступать к настройке, мы рекомендуем ознакомиться с механикой вставки, подробно описанной в разделе Using Threads for Reads руководства Optimizing for S3 Insert and Read Performance. Вкратце:

  • Параллелизм чтения (Read Parallelism) — количество потоков, используемых для чтения. Управляется через max_threads. В ClickHouse Cloud определяется размером экземпляра, по умолчанию равным количеству vCPU. Увеличение этого значения может улучшить производительность чтения за счет большего потребления памяти.
  • Параллелизм вставки (Insert Parallelism) — количество потоков, используемых для вставки данных. Управляется через max_insert_threads. В ClickHouse Cloud определяется размером экземпляра (между 2 и 4), а в OSS по умолчанию равно 1. Увеличение этого значения может улучшить производительность за счет большего потребления памяти.
  • Размер блока вставки (Insert Block Size) — данные обрабатываются в цикле: они извлекаются, парсятся и формируются в блоки вставки в памяти на основе ключа партиционирования. Эти блоки сортируются, оптимизируются, сжимаются и записываются в хранилище как новые части данных. Размер блока вставки, управляемый настройками min_insert_block_size_rows и min_insert_block_size_bytes (в несжатом виде), влияет на использование памяти и дисковый ввод-вывод. Более крупные блоки потребляют больше памяти, но создают меньше частей, снижая I/O и объем фоновых слияний. Эти настройки задают минимальные пороговые значения (как только достигается любое из них, инициируется сброс).
  • Размер блока для материализованного представления (Materialized view block size) — помимо описанной выше механики основной вставки, перед вставкой в материализованные представления блоки также укрупняются для более эффективной обработки. Размер этих блоков определяется настройками min_insert_block_size_bytes_for_materialized_views и min_insert_block_size_rows_for_materialized_views. Более крупные блоки позволяют более эффективно обрабатывать данные за счет большего потребления памяти. По умолчанию эти настройки наследуют значения настроек исходной таблицы min_insert_block_size_rows и min_insert_block_size_bytes соответственно.

Для улучшения производительности пользователи могут следовать рекомендациям, изложенным в разделе Tuning Threads and Block Size for Inserts руководства Optimizing for S3 Insert and Read Performance. В большинстве случаев нет необходимости дополнительно изменять min_insert_block_size_bytes_for_materialized_views и min_insert_block_size_rows_for_materialized_views для повышения производительности. Если вы все же изменяете эти параметры, применяйте те же рекомендуемые практики, которые описаны для min_insert_block_size_rows и min_insert_block_size_bytes.

Чтобы минимизировать потребление памяти, пользователи могут поэкспериментировать с этими настройками. Это неизбежно снизит производительность. Используя предыдущий запрос, ниже мы приводим примеры.

Уменьшение значения max_insert_threads до 1 снижает накладные расходы по памяти.

INSERT INTO pypi_v2
SELECT
    timestamp,
 project
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
SETTINGS max_insert_threads = 1

0 rows in set. Elapsed: 27.752 sec. Processed 1.50 billion rows, 33.48 GB (53.89 million rows/s., 1.21 GB/s.)
Peak memory usage: 506.78 MiB.

Мы можем дополнительно снизить потребление памяти, уменьшив значение параметра max_threads до 1.

INSERT INTO pypi_v2
SELECT timestamp, project
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
SETTINGS max_insert_threads = 1, max_threads = 1

Ок.

0 строк в наборе. Прошло: 43.907 сек. Обработано 1.50 миллиарда строк, 33.48 ГБ (34.06 миллиона строк/с., 762.54 МБ/с.)
Пиковое использование памяти: 272.53 МиБ.

Наконец, мы можем ещё больше сократить потребление памяти, установив min_insert_block_size_rows равным 0 (что отключает использование этого параметра при определении размера блока) и min_insert_block_size_bytes равным 10485760 (10 МиБ).

INSERT INTO pypi_v2
SELECT
    timestamp,
 project
FROM pypi
WHERE timestamp < '2024-12-17 09:00:00'
SETTINGS max_insert_threads = 1, max_threads = 1, min_insert_block_size_rows = 0, min_insert_block_size_bytes = 10485760

0 rows in set. Elapsed: 43.293 sec. Processed 1.50 billion rows, 33.48 GB (34.54 million rows/s., 773.36 MB/s.)
Peak memory usage: 218.64 MiB.

Наконец, имейте в виду, что уменьшение размеров блоков приводит к большему числу кусков и вызывает более сильную нагрузку на процесс слияний. Как обсуждается здесь, эти настройки следует изменять с осторожностью.

Отсутствует столбец с меткой времени или монотонно возрастающий столбец

Описанные выше процессы предполагают, что в таблице есть столбец с меткой времени или монотонно возрастающий столбец. В некоторых случаях он просто отсутствует. В этом случае мы рекомендуем следующий процесс, который использует многие из шагов, описанных ранее, но требует от пользователей приостановить приём данных.

  1. Приостановите вставки в основную таблицу.
  2. Создайте дубликат основной целевой таблицы, используя синтаксис CREATE AS.
  3. Присоедините партиции из исходной целевой таблицы к дубликату, используя ALTER TABLE ATTACH. Примечание: Эта операция присоединения отличается от ранее использованной операции перемещения. Хотя она и использует жёсткие ссылки, данные в исходной таблице сохраняются.
  4. Создайте новые материализованные представления.
  5. Возобновите вставки. Примечание: Вставки будут обновлять только целевую таблицу, а не дубликат, который будет ссылаться только на исходные данные.
  6. Выполните дозагрузку данных в материализованное представление, применив тот же процесс, что и выше для данных с метками времени, используя дубликат таблицы в качестве источника.

Рассмотрим следующий пример с использованием PyPI и нашего ранее созданного нового материализованного представления pypi_downloads_per_day (будем считать, что мы не можем использовать метку времени):

SELECT count() FROM pypi

┌────count()─┐
│ 2039988137 │ -- 2,04 миллиарда
└────────────┘

1 row in set. Elapsed: 0.003 sec.

-- (1) Приостановите вставки
-- (2) Создайте дубликат целевой таблицы

CREATE TABLE pypi_v2 AS pypi

SELECT count() FROM pypi_v2

┌────count()─┐
│ 2039988137 │ -- 2,04 миллиарда
└────────────┘

1 row in set. Elapsed: 0.004 sec.

-- (3) Присоедините партиции из исходной целевой таблицы к дубликату.

ALTER TABLE pypi_v2
 (ATTACH PARTITION tuple() FROM pypi)

-- (4) Создайте новые материализованные представления

CREATE TABLE pypi_downloads_per_day
(
    `hour` DateTime,
    `project` String,
    `count` Int64
)
ENGINE = SummingMergeTree
ORDER BY (project, hour)

CREATE MATERIALIZED VIEW pypi_downloads_per_day_mv TO pypi_downloads_per_day
AS SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi
GROUP BY
    hour,
 project

-- (4) Возобновите вставки. Здесь мы имитируем это, вставляя одну строку.

INSERT INTO pypi SELECT *
FROM pypi
LIMIT 1

SELECT count() FROM pypi

┌────count()─┐
│ 2039988138 │ -- 2,04 миллиарда
└────────────┘

1 row in set. Elapsed: 0.003 sec.

-- обратите внимание, что pypi_v2 содержит то же количество строк, что и ранее

SELECT count() FROM pypi_v2
┌────count()─┐
│ 2039988137 │ -- 2,04 миллиарда
└────────────┘

-- (5) Заполните представление, используя резервную копию pypi_v2

INSERT INTO pypi_downloads_per_day SELECT
 toStartOfHour(timestamp) as hour,
 project,
    count() AS count
FROM pypi_v2
GROUP BY
    hour,
 project

0 rows in set. Elapsed: 3.719 sec. Processed 2.04 billion rows, 47.15 GB (548,57 млн строк/с., 12,68 ГБ/с.)

DROP TABLE pypi_v2;


На предпоследнем шаге выполняется обратное заполнение `pypi_downloads_per_day` с использованием простого подхода `INSERT INTO SELECT`, описанного [ранее](#timestamp-or-monotonically-increasing-column-available). Этот процесс также можно улучшить, применив подход с таблицей Null, задокументированный [выше](#using-a-null-table-engine-for-filling-materialized-views), с опциональным использованием дублирующей таблицы для повышения отказоустойчивости.

Хотя данная операция требует приостановки вставок, промежуточные операции обычно выполняются быстро — это минимизирует прерывание потока данных.