Перейти к основному содержимому
Перейти к основному содержимому

Интеграция Apache Beam и ClickHouse

ClickHouse Supported

Apache Beam — это модель программирования с открытым исходным кодом, которая позволяет разработчикам определять и выполнять как пакетные, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam заключается в его способности поддерживать широкий спектр сценариев обработки данных, от ETL (извлечение, преобразование, загрузка) операций до сложной обработки событий и аналитики в реальном времени. Эта интеграция использует официальный JDBC-коннектор ClickHouse для основы слоя вставки.

Пакет интеграции

Пакет интеграции, необходимый для интеграции Apache Beam и ClickHouse, поддерживается и разрабатывается в рамках Apache Beam I/O Connectors — набора интеграций для многих популярных систем хранения данных и баз данных. Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO находится в репозитории Apache Beam.

Установка пакета Apache Beam ClickHouse

Установка пакета

Добавьте следующую зависимость в ваш фреймворк управления пакетами:

Рекомендуемая версия Beam

Коннектор ClickHouseIO рекомендуется для использования начиная с версии Apache Beam 2.59.0. Более ранние версии могут не полностью поддерживать функциональность коннектора.

Артефакты могут быть найдены в официальном maven репозитории.

Пример кода

Следующий пример читает CSV файл с названием input.csv как PCollection, преобразует его в объект Row (используя определенную схему) и вставляет его в локальный экземпляр ClickHouse с использованием ClickHouseIO:

Поддерживаемые типы данных

ClickHouseApache BeamПоддерживаетсяПримечания
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytes — это LogicalType, представляющий массив байтов фиксированной длины, расположенный в org.apache.beam.sdk.schemas.logicaltypes
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

Параметры ClickHouseIO.Write

Вы можете настроить параметры ClickHouseIO.Write с помощью следующих функций-сеттеров:

Функция-сеттер параметраТип аргументаЗначение по умолчаниюОписание
withMaxInsertBlockSize(long maxInsertBlockSize)1000000Максимальный размер блока строк для вставки.
withMaxRetries(int maxRetries)5Максимальное количество повторных попыток для неудачных вставок.
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)Максимальная накопительная длительность отката для повторов.
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)Начальная длительность отката перед первой попыткой.
withInsertDistributedSync(Boolean sync)trueЕсли true, синхронизирует операции вставки для распределенных таблиц.
withInsertQuorum(Long quorum)nullКоличество реплик, необходимых для подтверждения операции вставки.
withInsertDeduplicate(Boolean deduplicate)trueЕсли true, включается дедупликация для операций вставки.
withTableSchema(TableSchema schema)nullСхема целевой таблицы ClickHouse.

Ограничения

Пожалуйста, учитывайте следующие ограничения при использовании коннектора:

  • На сегодняшний день поддерживается только операция Sink. Коннектор не поддерживает операцию Source.
  • ClickHouse выполняет дедупликацию при вставке в ReplicatedMergeTree или в Distributed таблицу, построенную на основе ReplicatedMergeTree. Без репликации вставка в обычный MergeTree может приводить к дубликатам, если вставка не удалась и затем была успешно повторена. Однако каждый блок вставляется атомарно, и размер блока можно настроить с помощью ClickHouseIO.Write.withMaxInsertBlockSize(long). Дедупликация достигается с использованием контрольных сумм вставленных блоков. Для получения дополнительной информации о дедупликации, пожалуйста, посетите Дедупликация и Настройки дедупликации вставки.
  • Коннектор не выполняет никаких DDL операторов; следовательно, целевая таблица должна существовать до вставки.