Коннектор HTTP Sink для Confluent
Коннектор HTTP Sink не зависим от типа данных и, таким образом, не требует схемы Kafka, а также поддерживает специфические для ClickHouse типы данных, такие как Maps и Arrays. Эта дополнительная гибкость сопровождается незначительным увеличением сложности конфигурации.
Ниже мы описываем простую установку, извлекая сообщения из одной темы Kafka и вставляя строки в таблицу ClickHouse.
Коннектор HTTP предоставляется по лицензии Confluent Enterprise License.
Быстрый старт
1. Соберите ваши данные для подключения
Чтобы подключиться к ClickHouse с помощью HTTP(S), вам нужна следующая информация:
-
ХОСТ и ПОРТ: как правило, порт составляет 8443 при использовании TLS или 8123 при отсутствии TLS.
-
НАЗВАНИЕ БАЗЫ ДАННЫХ: по умолчанию существует база данных с именем
default
, используйте имя базы данных, к которой вы хотите подключиться. -
ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ: по умолчанию имя пользователя равно
default
. Используйте имя пользователя, соответствующее вашему случаю.
Сведения о вашем ClickHouse Cloud-сервисе доступны в консоли ClickHouse Cloud. Выберите сервис, к которому вы будете подключаться, и нажмите Подключиться:

Выберите HTTPS, и детали доступны в примере команды curl
.

Если вы используете самоуправляемый ClickHouse, детали подключения устанавливаются вашим администратором ClickHouse.
2. Запустите Kafka Connect и коннектор HTTP Sink
У вас есть два варианта:
-
Самоуправляемый: Скачайте пакет Confluent и установите его локально. Следуйте инструкциям по установке для установки коннектора, описанным здесь. Если вы используете метод установки confluent-hub, ваши локальные файлы конфигурации будут обновлены.
-
Confluent Cloud: Полностью управляемая версия HTTP Sink доступна для тех, кто использует Confluent Cloud для хостинга их Kafka. Это требует, чтобы ваша среда ClickHouse была доступна из Confluent Cloud.
Следующие примеры используют Confluent Cloud.
3. Создайте целевую таблицу в ClickHouse
Перед проверкой подключения давайте начнем с создания тестовой таблицы в ClickHouse Cloud, эта таблица будет принимать данные из Kafka:
4. Настройка HTTP Sink
Создайте тему Kafka и экземпляр коннектора HTTP Sink:

Настройте HTTP Sink Connector:
- Укажите название темы, которую вы создали
- Аутентификация
HTTP Url
- URL ClickHouse Cloud с указанным запросомINSERT
<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow
. Примечание: запрос должен быть закодирован.Тип аутентификации для EndPoint
- BASICИмя пользователя для аутентификации
- имя пользователя ClickHouseПароль аутентификации
- пароль ClickHouse
Этот HTTP Url подвержен ошибкам. Убедитесь, что экранирование точное, чтобы избежать проблем.

- Конфигурация
Формат значения записи Kafka
Зависит от ваших исходных данных, но в большинстве случаев это будет JSON или Avro. Мы предполагаемJSON
в следующих настройках.- В разделе
расширенные настройки
:Метод HTTP-запроса
- Установите на POSTФормат тела запроса
- jsonРазмер пакетной загрузки
- В соответствии с рекомендациями ClickHouse, установите это значение не менее 1000.Пакет json как массив
- trueПовторная попытка по HTTP-кодам
- 400-500, но адаптируйте по мере необходимости, например, это может измениться, если у вас есть HTTP-прокси перед ClickHouse.Максимальные повторные попытки
- по умолчанию (10) это значение подходит, но вы можете настроить для более надежных повторных попыток.

5. Проверка подключения
Создайте сообщение в теме, настроенной вашим HTTP Sink

и проверьте, что созданное сообщение было записано в вашу инстанцию ClickHouse.
Устранение неполадок
HTTP Sink не объединяет сообщения
Коннектор HTTP Sink не объединяет запросы для сообщений, содержащих разные значения заголовков Kafka.
- Убедитесь, что ваши записи Kafka имеют одинаковый ключ.
- Когда вы добавляете параметры к URL HTTP API, каждая запись может привести к уникальному URL. По этой причине объединение отключено при использовании дополнительных параметров URL.
400 Неверный запрос
CANNOT_PARSE_QUOTED_STRING
Если HTTP Sink выдает сообщение об ошибке при вставке объекта JSON в столбец String
:
Установите параметр input_format_json_read_objects_as_strings=1
в URL как закодированную строку SETTINGS%20input_format_json_read_objects_as_strings%3D1
Загрузка набора данных GitHub (по желанию)
Обратите внимание, что этот пример сохраняет поля Array набора данных Github. Мы предполагаем, что у вас есть пустая тема github в примерах и используется kcat для вставки сообщений в Kafka.
1. Подготовьте конфигурацию
Следуйте этим инструкциям для настройки Connect в зависимости от вашего типа установки, учитывая различия между автономным и распределенным кластером. Если вы используете Confluent Cloud, актуальна настройка для распределенной установки.
Самый важный параметр - это http.api.url
. HTTP интерфейс для ClickHouse требует, чтобы вы закодировали оператор INSERT как параметр в URL. Это должно включать формат (JSONEachRow
в этом случае) и целевую базу данных. Формат должен соответствовать данным Kafka, которые будут преобразованы в строку в HTTP полезной нагрузке. Эти параметры должны быть URL-экранированы. Пример этого формата для набора данных Github (при условии, что вы запускаете ClickHouse локально) показан ниже:
Следующие дополнительные параметры имеют отношение к использованию HTTP Sink с ClickHouse. Полный список параметров можно найти здесь:
request.method
- Установите на POSTretry.on.status.codes
- Установите на 400-500 для повторной попытки при любых кодах ошибок. Уточняйте в зависимости от ожидаемых ошибок в данных.request.body.format
- В большинстве случаев это будет JSON.auth.type
- Установите на BASIC, если вы используете безопасность с ClickHouse. Другие совместимые механизмы аутентификации ClickHouse в настоящее время не поддерживаются.ssl.enabled
- установите значение true, если используете SSL.connection.user
- имя пользователя для ClickHouse.connection.password
- пароль для ClickHouse.batch.max.size
- количество строк для отправки в одном пакете. Убедитесь, что установлено подходящее значение. Согласно рекомендациям ClickHouse рекомендации следует рассматривать значение 1000 как минимум.tasks.max
- Коннектор HTTP Sink поддерживает запуск одной или нескольких задач. Это может быть использовано для повышения производительности. Наряду с размером пакета это ваши основные средства для улучшения производительности.key.converter
- установите в зависимости от типов ваших ключей.value.converter
- установите на основе типа данных в вашей теме. Эти данные не требуют схемы. Формат здесь должен соответствовать ФОРМАТУ, указанному в параметреhttp.api.url
. Проще всего здесь использовать JSON и конвертер org.apache.kafka.connect.json.JsonConverter. Также возможно рассматривать значение как строку через конвертер org.apache.kafka.connect.storage.StringConverter, хотя это потребует от пользователя извлечь значение в операторах вставки, используя функции. Формат Avro также поддерживается в ClickHouse при использовании конвертера io.confluent.connect.avro.AvroConverter.
Полный список настроек, включая то, как настроить прокси, повторные попытки и расширенный SSL, можно найти здесь.
Пример конфигурационных файлов для образцов данных Github можно найти здесь, при условии, что Connect работает в автономном режиме, а Kafka размещён в Confluent Cloud.
2. Создайте таблицу ClickHouse
Убедитесь, что таблица была создана. Пример для минимального набора данных github с использованием стандартного MergeTree показан ниже.
3. Добавьте данные в Kafka
Вставьте сообщения в Kafka. Ниже используется kcat для вставки 10 тыс. сообщений.
Простое чтение из целевой таблицы "Github" должно подтвердить вставку данных.