Перейти к основному содержанию
Перейти к основному содержанию

Табличная функция s3

Предоставляет табличный интерфейс для чтения и записи файлов в Amazon S3 и Google Cloud Storage. Эта табличная функция аналогична функции hdfs, но поддерживает возможности, специфичные для S3.

Если в вашем кластере несколько реплик, вы можете использовать функцию s3Cluster для распараллеливания вставок.

При использовании табличной функции s3 с INSERT INTO...SELECT данные читаются и вставляются в потоковом режиме. В памяти находятся только несколько блоков данных, пока блоки непрерывно читаются из S3 и отправляются в целевую таблицу.

Синтаксис

s3(url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,structure] [,compression_method],[,headers], [,partition_strategy], [,partition_columns_in_data_file])
s3(named_collection[, option=value [,..]])
GCS

Табличная функция S3 интегрируется с Google Cloud Storage, используя GCS XML API и HMAC‑ключи. Дополнительные сведения об endpoint и HMAC см. в документации по интероперабельности Google.

Для GCS подставьте свой HMAC key и HMAC secret в места, где указаны access_key_id и secret_access_key.

Параметры

Табличная функция s3 поддерживает следующие простые параметры:

ParameterDescription
urlURL бакета с путем к файлу. Поддерживает следующие шаблоны в режиме только для чтения: *, **, ?, {abc,def} и {N..M}, где N, M — числа, 'abc', 'def' — строки. Дополнительную информацию см. здесь.
NOSIGNЕсли это ключевое слово указано на месте учетных данных, все запросы не будут подписываться.
access_key_id и secret_access_keyКлючи, задающие учетные данные для использования с данным endpoint. Необязательные.
session_tokenСессионный токен для использования с указанными ключами. Необязателен при передаче ключей.
formatФормат файла.
structureСтруктура таблицы. Формат: 'column1_name column1_type, column2_name column2_type, ...'.
compression_methodПараметр необязателен. Поддерживаемые значения: none, gzip или gz, brotli или br, xz или LZMA, zstd или zst. По умолчанию метод сжатия будет определен автоматически по расширению файла.
headersПараметр необязателен. Позволяет передавать заголовки в запросе S3. Передавайте в формате headers(key=value), например headers('x-amz-request-payer' = 'requester').
partition_strategyПараметр необязателен. Поддерживаемые значения: WILDCARD или HIVE. WILDCARD требует наличия {_partition_id} в пути, который будет заменен на ключ партиции. HIVE не допускает шаблоны, предполагает, что путь — корень таблицы, и генерирует каталоги партиций в стиле Hive с идентификаторами Snowflake в качестве имен файлов и форматом файла в качестве расширения. По умолчанию WILDCARD.
partition_columns_in_data_fileПараметр необязателен. Используется только со стратегией партиционирования HIVE. Указывает ClickHouse, следует ли ожидать, что колонки партиции будут записаны в файл данных. По умолчанию false.
storage_class_nameПараметр необязателен. Поддерживаемые значения: STANDARD или INTELLIGENT_TIERING. Позволяет указать AWS S3 Intelligent Tiering. По умолчанию STANDARD.
GCS

URL для GCS имеет следующий формат, так как endpoint для Google XML API отличается от JSON API:

  https://storage.googleapis.com/<bucket>/<folder>/<filename(s)>

и не https://storage.cloud.google.com.

Аргументы также могут передаваться с помощью именованных коллекций. В этом случае url, access_key_id, secret_access_key, format, structure, compression_method работают так же, и поддерживаются дополнительные параметры:

АргументОписание
filenameдобавляется к URL, если указан.
use_environment_credentialsпо умолчанию включен, позволяет передавать дополнительные параметры с помощью переменных окружения AWS_CONTAINER_CREDENTIALS_RELATIVE_URI, AWS_CONTAINER_CREDENTIALS_FULL_URI, AWS_CONTAINER_AUTHORIZATION_TOKEN, AWS_EC2_METADATA_DISABLED.
no_sign_requestпо умолчанию отключен.
expiration_window_secondsзначение по умолчанию — 120.

Возвращаемое значение

Таблица заданной структуры, предназначенная для чтения или записи данных в указанный файл.

Примеры

Выбор первых 5 строк таблицы из файла в S3 https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv:

SELECT *
FROM s3(
   'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv',
   'CSVWithNames'
)
LIMIT 5;
┌───────Дата─┬─Открытие┬──Максимум┬──Минимум┬─Закрытие┬────Объём─┬─Откр.инт─┐
│ 1984-09-07 │ 0.42388 │ 0.42902 │ 0.41874 │ 0.42388 │ 23220030 │       0 │
│ 1984-09-10 │ 0.42388 │ 0.42516 │ 0.41366 │ 0.42134 │ 18022532 │       0 │
│ 1984-09-11 │ 0.42516 │ 0.43668 │ 0.42516 │ 0.42902 │ 42498199 │       0 │
│ 1984-09-12 │ 0.42902 │ 0.43157 │ 0.41618 │ 0.41618 │ 37125801 │       0 │
│ 1984-09-13 │ 0.43927 │ 0.44052 │ 0.43927 │ 0.43927 │ 57822062 │       0 │
└────────────┴─────────┴──────────┴──────────┴─────────┴──────────┴──────────┘
Примечание

ClickHouse использует расширения имени файла для определения формата данных. Например, мы могли бы выполнить предыдущую команду без CSVWithNames:

SELECT *
FROM s3(
   'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv'
)
LIMIT 5;

ClickHouse также может определять метод сжатия файла. Например, если файл был упакован с расширением .csv.gz, ClickHouse автоматически его распакует.

Примечание

Файлы Parquet с именами вида *.parquet.snappy или *.parquet.zstd могут сбивать ClickHouse с толку и вызывать ошибки TOO_LARGE_COMPRESSED_BLOCK или ZSTD_DECODER_FAILED. Это происходит потому, что ClickHouse пытается прочитать весь файл как данные, закодированные в формате Snappy или ZSTD, тогда как на самом деле Parquet применяет сжатие на уровне групп строк и столбцов.

Метаданные Parquet уже указывают сжатие для каждого столбца, поэтому расширение файла является избыточным. В таких случаях вы можете просто использовать compression_method = 'none':

SELECT *
FROM s3(
  'https://<my-bucket>.s3.<my-region>.amazonaws.com/path/to/my-data.parquet.snappy',
  compression_format = 'none'
);

Использование

Предположим, что у нас есть несколько файлов со следующими URI в S3:

Подсчитайте количество строк в файлах, имена которых оканчиваются на цифры от 1 до 3:

SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/some_file_{1..3}.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
┌─count()─┐
│      18 │
└─────────┘

Посчитайте общее количество строк во всех файлах в этих двух каталогах:

SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/*', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
┌─count()─┐
│      24 │
└─────────┘
Совет

Если в перечне файлов используются числовые диапазоны с ведущими нулями, используйте конструкцию с фигурными скобками для каждой цифры по отдельности или символ ?.

Подсчитайте общее количество строк в файлах с именами file-000.csv, file-001.csv, ... , file-999.csv:

SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/big_prefix/file-{000..999}.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32');
┌─count()─┐
│      12 │
└─────────┘

Запишите данные в файл test-data.csv.gz:

INSERT INTO FUNCTION s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
VALUES ('test-data', 1), ('test-data-2', 2);

Выгрузите данные в файл test-data.csv.gz из существующей таблицы:

INSERT INTO FUNCTION s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
SELECT name, value FROM existing_table;

Шаблон glob ** можно использовать для рекурсивного обхода каталогов. В следующем примере будут рекурсивно получены все файлы из каталога my-test-bucket-768:

SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**', 'CSV', 'name String, value UInt32', 'gzip');

Ниже приведён запрос, который рекурсивно считывает данные из всех файлов test-data.csv.gz во всех подкаталогах каталога my-test-bucket:

SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');

Примечание. В файле конфигурации сервера можно указать пользовательские сопоставления URL-адресов. Пример:

SELECT * FROM s3('s3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');

URL 's3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz' будет заменён на 'http://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz'

Пользовательский маппер можно добавить в файл config.xml:

<url_scheme_mappers>
   <s3>
      <to>https://{bucket}.s3.amazonaws.com</to>
   </s3>
   <gs>
      <to>https://{bucket}.storage.googleapis.com</to>
   </gs>
   <oss>
      <to>https://{bucket}.oss.aliyuncs.com</to>
   </oss>
</url_scheme_mappers>

Для производственных сценариев рекомендуется использовать именованные коллекции. Пример:


CREATE NAMED COLLECTION creds AS
        access_key_id = '***',
        secret_access_key = '***';
SELECT count(*)
FROM s3(creds, url='https://s3-object-url.csv')

Партиционированная запись

Стратегия разбиения

Поддерживается только для запросов INSERT.

WILDCARD (по умолчанию): заменяет подстановочный символ {_partition_id} в пути к файлу фактическим ключом партиции.

HIVE реализует разбиение в стиле Hive для операций чтения и записи. Генерирует файлы в следующем формате: <prefix>/<key1=val1/key2=val2...>/<snowflakeid>.<toLower(file_format)>.

Пример стратегии разбиения HIVE

INSERT INTO FUNCTION s3(s3_conn, filename='t_03363_function', format=Parquet, partition_strategy='hive') PARTITION BY (year, country) SELECT 2020 as year, 'Россия' as country, 1 as id;
SELECT _path, * FROM s3(s3_conn, filename='t_03363_function/**.parquet');

   ┌─_path──────────────────────────────────────────────────────────────────────┬─id─┬─country─┬─year─┐
1. │ test/t_03363_function/year=2020/country=Russia/7351295896279887872.parquet │  1 │ Russia  │ 2020 │
   └────────────────────────────────────────────────────────────────────────────┴────┴─────────┴──────┘

Примеры стратегии партиционирования WILDCARD

  1. Использование идентификатора партиции в ключе создаёт отдельные файлы:
INSERT INTO TABLE FUNCTION
    s3('http://bucket.amazonaws.com/my_bucket/file_{_partition_id}.csv', 'CSV', 'a String, b UInt32, c UInt32')
    PARTITION BY a VALUES ('x', 2, 3), ('x', 4, 5), ('y', 11, 12), ('y', 13, 14), ('z', 21, 22), ('z', 23, 24);

В результате данные записываются в три файла: file_x.csv, file_y.csv и file_z.csv.

  1. Использование идентификатора партиции в имени бакета создаёт файлы в разных бакетах:
INSERT INTO TABLE FUNCTION
    s3('http://bucket.amazonaws.com/my_bucket_{_partition_id}/file.csv', 'CSV', 'a UInt32, b UInt32, c UInt32')
    PARTITION BY a VALUES (1, 2, 3), (1, 4, 5), (10, 11, 12), (10, 13, 14), (20, 21, 22), (20, 23, 24);

В результате данные записываются в три файла в разных бакетах: my_bucket_1/file.csv, my_bucket_10/file.csv и my_bucket_20/file.csv.

Доступ к публичным бакетам

ClickHouse пытается получить учетные данные из множества разных источников. Иногда это может приводить к проблемам при доступе к некоторым публичным бакетам, из‑за чего клиент возвращает код ошибки 403. Этой проблемы можно избежать, используя ключевое слово NOSIGN, которое заставляет клиент игнорировать все учетные данные и не подписывать запросы.

SELECT *
FROM s3(
   'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv',
   NOSIGN,
   'CSVWithNames'
)
LIMIT 5;

Использование учетных данных S3 (ClickHouse Cloud)

Для непубличных бакетов пользователи могут передать aws_access_key_id и aws_secret_access_key функции. Например:

SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv', '<KEY>', '<SECRET>','TSVWithNames')

Это подходит для разовых обращений или в случаях, когда учетные данные можно легко регулярно менять. Однако это не рекомендуется как долгосрочное решение для повторяющегося доступа или в ситуациях, когда учетные данные являются конфиденциальными. В этом случае мы рекомендуем полагаться на доступ на основе ролей.

Доступ на основе ролей для S3 в ClickHouse Cloud описан здесь.

После настройки roleARN можно передать в функцию s3 через параметр extra_credentials. Например:

SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv','CSVWithNames',extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/ClickHouseAccessRole-001'))

Дополнительные примеры можно найти здесь

Работа с архивами

Предположим, что у нас есть несколько архивных файлов со следующими URI в S3:

Данные из этих архивов можно извлекать с использованием ::. Глоб-шаблоны (glob) могут использоваться как в части URL, так и в части после :: (отвечающей за имя файла внутри архива).

SELECT *
FROM s3(
   'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-1{0..2}.csv.zip :: *.csv'
);
Примечание

ClickHouse поддерживает три формата архивов: ZIP TAR 7Z Архивы ZIP и TAR можно читать из любого поддерживаемого хранилища, а архивы 7Z — только с локальной файловой системы, на которой установлен ClickHouse.

Вставка данных

Обратите внимание, что строки можно вставлять только в новые файлы. Операции слияния или разбиения файлов не выполняются. После того как файл записан, последующие вставки в него завершатся с ошибкой. Подробнее см. здесь.

Виртуальные столбцы

  • _path — Путь к файлу. Тип: LowCardinality(String). В случае архива показывает путь в формате: "{path_to_archive}::{path_to_file_inside_archive}".
  • _file — Имя файла. Тип: LowCardinality(String). В случае архива показывает имя файла внутри архива.
  • _size — Размер файла в байтах. Тип: Nullable(UInt64). Если размер файла неизвестен, значение — NULL. В случае архива показывает несжатый размер файла внутри архива.
  • _time — Время последнего изменения файла. Тип: Nullable(DateTime). Если время неизвестно, значение — NULL.

настройка use_hive_partitioning

Эта настройка подсказывает ClickHouse, что при чтении нужно разбирать файлы с секционированием в стиле Hive. На операцию записи она не влияет. Для симметричных операций чтения и записи используйте аргумент partition_strategy.

Когда настройка use_hive_partitioning установлена в значение 1, ClickHouse обнаружит секционирование в стиле Hive в пути (/name=value/) и позволит использовать столбцы партиций как виртуальные столбцы в запросе. Эти виртуальные столбцы будут иметь те же имена, что и в секционированном пути, но начинаться с символа _.

Пример

SELECT * FROM s3('s3://data/path/date=*/country=*/code=*/*.parquet') WHERE date > '2020-01-01' AND country = 'Netherlands' AND code = 42;

Доступ к бакетам с оплатой по запросам (requester pays)

Чтобы получить доступ к бакету с оплатой по запросам (requester pays), во всех запросах нужно передавать заголовок x-amz-request-payer = requester. Это можно сделать, передав параметр headers('x-amz-request-payer' = 'requester') в функцию s3. Например:

SELECT
    count() AS num_rows,
    uniqExact(_file) AS num_files
FROM s3('https://coiled-datasets-rp.s3.us-east-1.amazonaws.com/1trc/measurements-100*.parquet', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', headers('x-amz-request-payer' = 'requester'))

┌───num_rows─┬─num_files─┐
│ 1110000000 │       111 │
└────────────┴───────────┘

Получена 1 строка. Затрачено: 3.089 сек. Обработано 1.09 млрд строк, 0.00 Б (353.55 млн строк/сек., 0.00 Б/сек.)
Пиковое использование памяти: 192.27 КиБ.

Настройки хранения

  • s3_truncate_on_insert - позволяет усечь файл перед вставкой в него. По умолчанию отключено.
  • s3_create_new_file_on_insert - позволяет создавать новый файл при каждой вставке, если формат имеет суффикс. По умолчанию отключено.
  • s3_skip_empty_files - позволяет пропускать пустые файлы при чтении. По умолчанию включено.

Вложенные схемы Avro

При чтении файлов Avro, содержащих вложенные записи, которые различаются между файлами (например, в некоторых файлах есть дополнительное поле внутри вложенного объекта), ClickHouse может вернуть ошибку вида:

The number of leaves in record doesn't match the number of elements in tuple...

Это происходит потому, что ClickHouse ожидает, что все структуры вложенных записей будут соответствовать одной и той же схеме.
Чтобы обработать такую ситуацию, вы можете:

  • Использовать schema_inference_mode='union' для объединения различных схем вложенных записей, или
  • Вручную привести вложенные структуры к единому виду и включить
    use_structure_from_insertion_table_in_table_functions=1.
Замечание о производительности

schema_inference_mode='union' может работать дольше на очень больших наборах данных в S3, поскольку ему необходимо просканировать каждый файл для вывода схемы.

Пример

INSERT INTO data_stage
SELECT
    id,
    data
FROM s3('https://bucket-name/*.avro', 'Avro')
SETTINGS schema_inference_mode='union';