Параллельные реплики
Введение
ClickHouse выполняет запросы с невероятной скоростью, но как эти запросы распределяются и распараллеливаются по нескольким серверам?
В этом руководстве мы сначала обсудим, как ClickHouse распределяет запрос между несколькими шардерами через распределенные таблицы, а затем как запрос может использовать несколько реплик для его выполнения.
Шардированная архитектура
В бездисковой архитектуре кластеры обычно разделяются на несколько шардов, при этом каждый шард содержит подмножество всех данных. Распределенная таблица располагается поверх этих шардов, предоставляя объединённый вид на все данные.
Чтения могут быть отправлены в локальную таблицу. Выполнение запроса будет происходить только на указанном шарде, либо может быть отправлено в распределенную таблицу, и в этом случае каждый шард выполнит указанные запросы. Сервер, где был запрошен распределенная таблица, агрегирует данные и отвечает клиенту:

На рисунке выше показано, что происходит, когда клиент запрашивает распределённую таблицу:
Запрос select отправляется в распределённую таблицу на узел произвольно (через стратегию round-robin или после маршрутизации на конкретный сервер через балансировщик нагрузки). Этот узел теперь будет действовать как координатор.
Узел найдет каждый шард, который должен выполнить запрос, используя информацию, указанную распределённой таблицей, и запрос отправляется на каждый шард.
Каждый шард читает, фильтрует и агрегирует данные локально, а затем отправляет обратно состояние, которое можно объединить, к координатору.
Координирующий узел объединяет данные, а затем отправляет ответ клиенту.
Когда мы добавляем реплики в процесс, он остаётся довольно схожим, с единственным отличием, что только одна реплика из каждого шарда выполнит запрос. Это означает, что больше запросов может быть обработано параллельно.
Нешардированная архитектура
ClickHouse Cloud имеет очень другую архитектуру по сравнению с представленной выше. (См. "Архитектура ClickHouse Cloud" для более детальной информации). С разделением вычислений и хранения, и практически неограниченным количеством хранилища, потребность в шардах становится менее важной.
На картинке ниже показана архитектура ClickHouse Cloud:

Эта архитектура позволяет нам добавлять и удалять реплики почти мгновенно, обеспечивая очень высокую масштабируемость кластера. Кластер ClickHouse Keeper (показан справа) обеспечивает наличие единого источника истины для метаданных. Реплики могут получать метаданные из кластера ClickHouse Keeper и поддерживать одинаковые данные. Сами данные хранятся в объектном хранилище, и SSD-кэш позволяет ускорять запросы.
Но как теперь распределить выполнение запроса по нескольким серверам? В шардированной архитектуре это было довольно очевидно, поскольку каждый шард мог фактически выполнить запрос на подмножестве данных. Как это работает, когда шардирования нет?
Введение в параллельные реплики
Для параллельного выполнения запросов через несколько серверов, сначала нужно назначить один из наших серверов координатором. Координатор создает список задач, которые необходимо выполнить, гарантирует их выполнения, агрегирует результаты и возвращает их клиенту. Как и в большинстве распределённых систем, это будет ролью узла, который получил изначальный запрос. Также необходимо определить единицу работы. В шардированной архитектуре единицей работы является шард, подмножество данных. С параллельными репликами мы будем использовать небольшую часть таблицы, называемую гранулы, как единицу работы.
Теперь давайте посмотрим, как это работает на практике, с помощью рисунка ниже:

С параллельными репликами:
Запрос от клиента отправляется на один узел после прохождения через балансировщик нагрузки. Этот узел становится координатором для этого запроса.
Узел анализирует индекс каждой части и выбирает правильные части и гранулы для обработки.
Координатор делит рабочую нагрузку на набор гранул, которые могут быть назначены различным репликам.
Каждый набор гранул обрабатывается соответствующими репликами, и когда они закончат, будет отправлено состояние, которое можно объединить, к координатору.
Наконец, координатор объединяет все результаты от реплик и возвращает ответ клиенту.
Вышеперечисленные шаги показывают, как в теории работают параллельные реплики. Однако на практике существует множество факторов, которые могут помешать работе такой логики:
Некоторые реплики могут быть недоступны.
Репликация в ClickHouse асинхронная, некоторые реплики могут не иметь одинаковых частей в какой-то момент времени.
Задержка между репликами должна как-то обрабатываться.
Кэш файловой системы отличается от реплики к реплике в зависимости от активности на каждой реплике, что означает, что случайное назначение задач может привести к менее оптимальной производительности, учитывая локальность кэша.
Мы рассмотрим, как преодолеваются эти факторы в следующих разделах.
Объявления
Для решения вопросов (1) и (2) из вышеприведённого списка мы ввели понятие объявления. Давайте визуализируем, как это работает, используя рисунок ниже:

Запрос от клиента отправляется на один узел после прохождения через балансировщик нагрузки. Узел становится координатором для этого запроса.
Координатор отправляет запрос на получение объявлений от всех реплик в кластере. Реплики могут иметь слегка разные представления текущего набора частей для таблицы. В результате, необходимо собрать эту информацию, чтобы избежать неверных решений по планированию.
Координатор затем использует объявления для определения набора гранул, которые могут быть назначены различным репликам. Здесь, например, видно, что никакие гранулы из части 3 не были назначены реплике 2, потому что эта реплика не предоставила данную часть в своём объявлении. Также обратите внимание, что ни одна задача не была назначена реплике 3, потому что эта реплика не предоставила объявление.
После того, как каждая реплика обработала запрос на своём подмножестве гранул, и состояние, которое можно объединить, было отправлено к координатору, координатор объединяет результаты и ответ отправляется клиенту.
Динамическая координация
Для решения проблемы задержки, мы добавили динамическую координацию. Это означает, что все гранулы не отправляются реплике одним запросом, но каждая реплика сможет запросить новую задачу (набор гранул для обработки) у координатора. Координатор предоставит реплике набор гранул на основе полученного объявления.
Предположим, что мы находимся на стадии процесса, где все реплики отправили объявление со всеми частями.
Рисунок ниже визуализирует, как работает динамическая координация:

Реплики уведомляют координаторский узел, что они способны обработать задачи, они также могут указать, сколько работы они могут обработать.
Координатор назначает задачи репликам.

Реплики 1 и 2 смогли быстро завершить свои задачи. Они запрашивают другую задачу у координаторского узла.
Координатор назначает новые задачи репликам 1 и 2.

Теперь все реплики завершили обработку своих задач. Они запрашивают больше задач.
Координатор, используя объявления, проверяет, какие задачи остались для обработки, но оставшихся задач нет.
Координатор сообщает репликам, что все было обработано. Теперь он объединит все состояния, которые можно объединить, и ответит на запрос.
Управление локальностью кэша
Последней оставшейся потенциальной проблемой является управление локальностью кэша. Если запрос выполняется несколько раз, как можно обеспечить, что одна и та же задача будет маршрутизироваться к одной и той же реплике? В предыдущем примере у нас были следующие назначенные задачи:
Реплика 1 | Реплика 2 | Реплика 3 | |
---|---|---|---|
Часть 1 | g1, g6, g7 | g2, g4, g5 | g3 |
Часть 2 | g1 | g2, g4, g5 | g3 |
Часть 3 | g1, g6 | g2, g4, g5 | g3 |
Чтобы гарантировать, что одни и те же задачи назначаются одним и тем же репликам и могут воспользоваться кэшем, происходят две вещи. Хэш из части + набор гранул (задача) вычисляется. Модуль числа реплик для назначения задачи применяется.
На бумаге это звучит хорошо, но на практике, внезапная нагрузка на одну реплику или
ухудшение сети, может вводить задержки, если одна и та же реплика постоянно используется
для выполнения определенных задач. Если max_parallel_replicas
меньше
числа реплик, то для выполнения запроса выбираются случайные реплики.
Кража задач
если какая-то реплика обрабатывает задачи медленнее других, другие реплики попытаются 'украсть' задачи, которые по идее принадлежат этой реплике по хэшу, чтобы уменьшить задержку.
Ограничения
Эта функция имеет известные ограничения, основные из которых задокументированы в этом разделе.
Если вы обнаружили проблему, которая не является одной из перечисленных ниже
ограничений и подозреваете, что параллельная реплика может быть причиной, пожалуйста,
откройте проблему на GitHub, используя метку comp-parallel-replicas
.
Ограничение | Описание |
---|---|
Сложные запросы | В настоящее время параллельная реплика работает довольно хорошо для простых запросов. Слои сложности, такие как общие табличные выражения (CTE), подзапросы, JOIN, не плоские запросы и др., могут отрицательно сказываться на производительности запроса. |
Маленькие запросы | Если вы выполняете запрос, который не обрабатывает много строк, то его выполнение на нескольких репликах может не привести к улучшению времени выполнения, учитывая, что сетевое время для координации между репликами может привести к дополнительным циклам при выполнении запроса. Вы можете уменьшить эти проблемы, используя настройку: parallel_replicas_min_number_of_rows_per_replica . |
Параллельные реплики отключены с FINAL | |
Данные с высокой кардинальностью и сложная агрегация | Агрегация с высокой кардинальностью, которая нуждается в отправке большого количества данных, может значительно замедлить ваши запросы. |
Совместимость с новым анализатором | Новый анализатор может значительно замедлить или ускорить выполнение запросов в конкретных сценариях. |
Настройки, связанные с параллельными репликами
Настройка | Описание |
---|---|
enable_parallel_replicas | 0 : отключено1 : включено 2 : Принудительное использование параллельной реплики, выбрасывает исключение, если не используется. |
cluster_for_parallel_replicas | Имя кластера для использования параллельной репликации; если вы используете ClickHouse Cloud, используйте default . |
max_parallel_replicas | Максимальное количество реплик, используемых для выполнения запроса на несколько реплик, если указано число меньше, чем количество реплик в кластере, узлы будут выбраны случайно. Это значение также может быть переполнено для учёта горизонтального масштабирования. |
parallel_replicas_min_number_of_rows_per_replica | Помогает ограничить количество используемых реплик на основе количества строк, которые нужно обработать. Количество реплик определяется: примерное количество строк для чтения / min_number_of_rows_per_replica . |
allow_experimental_analyzer | 0 : использовать старый анализатор1 : использовать новый анализатор. Поведение параллельных реплик может изменяться в зависимости от используемого анализатора. |
Исследование проблем с параллельными репликами
Вы можете проверить, какие настройки используются для каждого запроса в таблице system.query_log
. Вы также можете посмотреть таблицу system.events
, чтобы увидеть все события, которые произошли на сервере, и использовать табличную функцию clusterAllReplicas
для просмотра таблиц на всех репликах (если вы пользователь облака, используйте default
).
Ответ
Таблица system.text_log
также содержит информацию о выполнении запросов с использованием параллельных реплик:
Ответ
Наконец, вы можете использовать EXPLAIN PIPELINE
. Он показывает, как ClickHouse будет выполнять запрос и какие ресурсы будут использованы для выполнения запроса. Возьмём, например, следующий запрос:
Давайте посмотрим на конвейер запросов без параллельной реплики:

А теперь с параллельной репликой:
