Перейти к основному содержимому

YDB Topics в Go — Исчерпывающее руководство

· 31 мин. чтения

Обложка статьи

Полное описание пакета topic в ydb-go-sdk, написанное для инженера, впервые знакомящегося с YDB Topics. Руководство ведёт от базовых понятий («что такое топик?») через каждый публичный тип, опцию и метод, и заканчивается сквозными рабочими примерами.

Оглавление

  1. Что такое YDB Topic?
  2. Структура пакета
  3. Основные понятия: topictypes
  4. Интерфейс Client
  5. Паттерн опций: topicoptions
  6. Администрирование топика
  7. Запись сообщений: topicwriter
  8. Чтение сообщений, pull-модель: topicreader
  9. Чтение сообщений, push-модель: topiclistener
  10. Вспомогательные утилиты: topicsugar
  11. Сквозные паттерны использования
  12. Краткий справочник

1. Что такое YDB Topic?

YDB Topic — это надёжный, упорядоченный, разделённый на партиции лог сообщений — по форме такой же, как топик в Kafka или Pulsar. Производители (producers) добавляют сообщения с одного конца; потребители (consumers) читают их с того места, на котором остановились.

Ключевая терминология:

  • Топик (Topic) — именованный, персистентный поток байтов. Идентифицируется путём вида /Root/my-topic.
  • Партиция (Partition) — топик разбит на одну или несколько партиций. Каждая партиция — строго упорядоченная последовательность с монотонно возрастающими оффсетами (offsets). Партиции — единица параллелизма.
  • Производитель (Producer / writer) — тот, кто публикует сообщения. Производитель идентифицируется через ProducerID; сообщения с одним и тем же MessageGroupID гарантированно попадут в одну партицию по порядку.
  • Потребитель (Consumer)зарегистрированная роль читателя, привязанная к топику. Концептуально похоже на consumer group в Kafka, но определяется заранее на самом топике. Несколько читателей с одним и тем же именем потребителя разделяют учёт оффсетов.
  • Оффсет (Offset) — позиция сообщения внутри его партиции. Потребители коммитят оффсеты, чтобы фиксировать прогресс.
  • Срок хранения (Retention) — сообщения живут в течение RetentionPeriod или пока топик не превысит RetentionStorageMB, затем удаляются.
  • Важный потребитель (Important consumer) — потребитель с флагом Important=true удерживает сообщения от удаления даже после истечения срока хранения (пока этот потребитель не закоммитит их). Полезно для потребителей, которые обязаны увидеть каждое сообщение.
  • Период доступности (Availability period) — переопределение на уровне потребителя: минимальное время, в течение которого сообщение остаётся доступным для этого потребителя, даже если оно не закоммичено. Позволяет медленному потребителю отстать без потери данных.
  • Кодек (Codec) — сжатие на уровне передачи. Встроенные: Raw, Gzip. Подключаемые: Lzop, Zstd или любые из пользовательского диапазона [10000, 20000).
  • Режим тарификации (MeteringMode) — для serverless YDB: ReservedCapacity против RequestUnits. Определяет, как тарифицируется использование.

Чем это отличается от Kafka?

YDB TopicsKafka
Потребители объявлены на самом топикеConsumer groups возникают со стороны клиента
Важные потребители могут удерживать retentionАналога нет
Период доступности для каждого потребителяТолько глобальный retention топика
Встроенные транзакции на чтение+запись с YDB query/table txТранзакционный API; изолирован от РСУБД
Встроенная поддержка CDC через тот же topic APIОтдельная (Connect / Debezium)

Точка входа для всего нижеизложенного — db.Topic(), возвращающая topic.Client после обычного ydb.Open(...):

db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil { log.Fatal(err) }
client := db.Topic()

2. Структура пакета

Go SDK разбивает topic API на семь сотрудничающих пакетов. Сам пакет topic определяет только интерфейс Client; всё остальное — в подпакетах.

Как читать эту диаграмму:

  • topictypes — универсальный словарь, от него зависит каждый другой пакет.
  • topicoptions — слой builder-ов, используемый в вызовах Client.*.
  • topicreader, topicwriter, topiclistener — три альтернативных I/O-интерфейса; выбираете один в зависимости от паттерна доступа.
  • topicsugar — тонкий слой удобств поверх topicreader.

Ссылки: topic/client.go:14, db.Topic() определён на *ydb.Driver.

3. Основные понятия: topictypes

Все публичные типы, описывающие форму топика, потребителей и партиций, находятся в topic/topictypes/topictypes.go. Концептуальная модель:

3.1 Codec

type Codec int32

const (
CodecRaw = Codec(...) // без сжатия
CodecGzip = Codec(...) // встроенный
CodecLzop = Codec(...) // подключите свою библиотеку
CodecZstd = Codec(...) // подключите свою библиотеку
CodecCustomerFirst = Codec(10000) // начало пользовательского диапазона
CodecCustomerEnd = Codec(20000) // не включая
)

Кастомные кодеки регистрируются на writer через topicoptions.WithWriterAddEncoder(codec, factory) и на reader через topicoptions.WithAddDecoder(codec, factory).

3.2 Consumer

type Consumer struct {
Name string // обязательное, уникально внутри топика
Important bool // удерживает retention, если отстаёт
SupportedCodecs []Codec // кодеки, которые этот потребитель может декодировать
ReadFrom time.Time // самая ранняя временная метка сообщения
Attributes map[string]string // произвольные метки
AvailabilityPeriod *time.Duration // nil = значение сервера по умолчанию
}

3.3 PartitionSettings и авто-партицирование

type PartitionSettings struct {
MinActivePartitions int64
MaxActivePartitions int64
PartitionCountLimit int64 // жёсткий потолок
AutoPartitioningSettings AutoPartitioningSettings
}

type AutoPartitioningSettings struct {
AutoPartitioningStrategy AutoPartitioningStrategy // Disabled | ScaleUp | ScaleUpAndDown | Paused
AutoPartitioningWriteSpeedStrategy AutoPartitioningWriteSpeedStrategy // окно + пороги
}

Когда авто-партицирование включено, сервер может разделить «горячую» партицию на двух потомков (записано в ParentPartitionIDs / ChildPartitionIDs структуры PartitionInfo). Читатели должны явно согласиться через topicoptions.WithReaderSupportSplitMergePartitions(true), чтобы корректно обрабатывать split/merge.

3.4 TopicDescription

Возвращается из Client.Describe():

type TopicDescription struct {
Path string
PartitionSettings PartitionSettings
Partitions []PartitionInfo
RetentionPeriod time.Duration
RetentionStorageMB int64
SupportedCodecs []Codec
PartitionWriteBurstBytes int64
PartitionWriteSpeedBytesPerSecond int64
Attributes map[string]string
Consumers []Consumer
MeteringMode MeteringMode
}

3.5 PartitionInfo и PartitionStats

type PartitionInfo struct {
PartitionID int64
Active bool
ChildPartitionIDs []int64 // присутствует, если партиция была разделена
ParentPartitionIDs []int64 // присутствует, если партиция была слита из других
PartitionStats PartitionStats
FromBound, ToBound []byte // диапазон ключей, принадлежащих партиции
}

type PartitionStats struct {
PartitionsOffset OffsetRange // Start (самое раннее живое) → End (следующая запись)
StoreSizeBytes int64
LastWriteTime *time.Time
MaxWriteTimeLag *time.Duration
BytesWritten MultipleWindowsStat // за минуту / час / день
}

3.6 TopicConsumerDescription

Возвращается из Client.DescribeTopicConsumer() — добавляет учёт оффсетов на уровне потребителя для каждой партиции:

type TopicConsumerDescription struct {
Path string
Consumer Consumer
Partitions []DescribeConsumerPartitionInfo
}

type DescribeConsumerPartitionInfo struct {
PartitionID int64
Active bool
ChildPartitionIDs []int64
ParentPartitionIDs []int64
PartitionStats PartitionStats
PartitionConsumerStats PartitionConsumerStats
}

type PartitionConsumerStats struct {
LastReadOffset int64
CommittedOffset int64
ReadSessionID string
PartitionReadSessionCreateTime *time.Time
LastReadTime *time.Time
MaxReadTimeLag *time.Duration
MaxWriteTimeLag *time.Duration
BytesRead MultipleWindowsStat
ReaderName string
}

CommittedOffset — чекпоинт потребителя; LastReadOffset — самая дальняя точка, до которой дочитал активный читатель. Зазор между ними — это работа «в полёте» (in-flight).

3.7 MeteringMode

type MeteringMode int

const (
MeteringModeUnspecified
MeteringModeReservedCapacity // предзакупленная ёмкость
MeteringModeRequestUnits // оплата по запросам
)

4. Интерфейс Client

Полный интерфейс Client находится в topic/client.go:16-87:

type Client interface {
Alter(ctx, path, opts ...AlterOption) error
Create(ctx, path, opts ...CreateOption) error
Describe(ctx, path, opts ...DescribeOption) (TopicDescription, error)
DescribeTopicConsumer(ctx, path, consumer, opts ...DescribeConsumerOption) (TopicConsumerDescription, error)
CommitOffset(ctx, path, partitionID, consumer, offset, opts ...CommitOffsetOption) error
Drop(ctx, path, opts ...DropOption) error

StartListener(consumer, handler, readSelectors, opts ...ListenerOption) (*TopicListener, error)
StartReader(consumer, readSelectors, opts ...ReaderOption) (*Reader, error)
StartWriter(topicPath, opts ...WriterOption) (*Writer, error)
StartTransactionalWriter(tx, topicpath, opts ...WriterOption) (*TxWriter, error)
}

Заметки по отдельным методам:

  • Create / Drop — прямолинейные; идемпотентность не встроена (второй Create вернёт ошибку).
  • Alter — изменяются только те поля, которые вы адресовали через опции. Добавлять/удалять потребителей, менять кодеки, партиции, retention и т. д.
  • Describe vs DescribeTopicConsumer — первый возвращает метаданные по всему топику; второй фокусируется на одном потребителе и его представлении каждой партиции (оффсеты и отставание).
  • CommitOffset — ручной, внеполосный коммит. Сценарии: инструменты, восстановление, replay. Важное замечание из godoc:

Используйте topicoptions.WithCommitOffsetReadSessionID (из reader.ReadSessionID()), чтобы не прерывать текущую сессию чтения. Без него сервер прервёт активную сессию чтения для этой партиции, заставив читателя переподключаться.

Поведение сервера для необычных значений оффсета:

ВходОтвет сервера
offset > конец партицииBAD_REQUEST
offset < 0BAD_REQUEST
offset < закоммиченныйпринят; закоммиченная позиция откатывается
offset == закоммиченныйno-op
  • StartReader / StartWriter / StartListener / StartTransactionalWriter — все четыре быстрые неблокирующие вызовы. gRPC-соединение возвращаемого объекта устанавливается в фоне; если нужно убедиться в успехе, вызовите WaitInit(ctx) перед первой операцией I/O.

  • Экспериментальные APIStartListener, StartTransactionalWriter и Reader.PopMessagesBatchTx помечены как экспериментальные и могут меняться. См. VERSIONING.md.

5. Паттерн опций: topicoptions

Каждый метод Client.* заканчивается на opts ...XxxOption. Каждая опция — типизированное значение, реализующее маркерный интерфейс (CreateOption, AlterOption, ReaderOption и т. п.). Этот паттерн сохраняет стабильность сигнатур по мере добавления новых настроек.

5.1 Маркерные интерфейсы

ИнтерфейсИспользуется
CreateOptionClient.Create
AlterOptionClient.Alter
DescribeOptionClient.Describe
DescribeConsumerOptionClient.DescribeTopicConsumer
DropOptionClient.Drop
CommitOffsetOptionClient.CommitOffset
ReaderOptionClient.StartReader
WriterOptionClient.StartWriter, StartTransactionalWriter
ListenerOptionClient.StartListener

5.2 Часто используемые опции — шпаргалка

Create / Alter

ОпцияЧто делает
CreateWithSupportedCodecs(codecs ...) / AlterWithSupportedCodecs(...)Ограничивает кодеки, принимаемые топиком
CreateWithMinActivePartitions(n) / CreateWithMaxActivePartitions(n)Статический размер партиций
CreateWithPartitionCountLimit(n)Жёсткий потолок
CreateWithAutoPartitioningSettings(s)Включить авто split/merge
CreateWithRetentionPeriod(d) / CreateWithRetentionStorageMB(mb)Как долго / сколько хранить
CreateWithPartitionWriteSpeedBytesPerSecond(n) / CreateWithPartitionWriteBurstBytes(n)Лимиты пропускной способности на партицию
CreateWithAttributes(map)Произвольные метки
CreateWithConsumer(consumers...) / AlterWithAddConsumers(...) / AlterWithDropConsumers(names...)Управление регистрациями потребителей
AlterConsumerWithImportant(name, bool)Переключить флаг удержания retention
AlterConsumerWithReadFrom(name, t)Пропускать сообщения старше t
AlterConsumerWithSupportedCodecs(name, []Codec)Ограничение кодеков для потребителя
AlterConsumerWithAttributes(name, map)Метки потребителя
AlterConsumerWithAvailabilityPeriod(name, d)Минимальный retention для потребителя
AlterConsumerResetAvailabilityPeriod(name)Вернуть значение топика по умолчанию
CreateWithMeteringMode(mode) / AlterWithMeteringMode(mode)Режим тарификации (serverless)

Reader (topicoptions_reader.go)

ОпцияЧто делает
ReadTopic(path)Сокращение, создающее ReadSelectors для одного топика
WithReaderOperationTimeout(d), WithReaderStartTimeout(d)Дедлайны соединения/операции
WithReaderCheckRetryErrorFunction(fn)Своя политика повторов
WithReaderCommitTimeLagTrigger(d), WithReaderCommitCountTrigger(n)Триггеры пакетных коммитов в async-режиме
WithReaderCommitMode(mode)CommitModeAsync (по умолчанию, быстро) или CommitModeSync
WithReaderBatchMaxCount(n), WithReaderBufferSizeBytes(n)Буферизация на приём
WithAddDecoder(codec, factory)Регистрация кастомного кодека
WithReaderGetPartitionStartOffset(fn)Решить, с какого оффсета стартовать в партиции
WithReaderSupportSplitMergePartitions(true)Согласиться на семантику split/merge
WithReaderWithoutConsumer(saveState)Чтение без зарегистрированного потребителя (без серверного учёта оффсетов)
WithReaderOnStopPartitionSession(fn)Колбэк, когда сервер переназначает партицию

Writer (topicoptions_writer.go)

ОпцияЧто делает
WithWriterProducerID(id) / WithProducerID(id)Стабильная идентичность producer-а
WithWriterPartitionID(id) / WithPartitionID(id)Пин записи в конкретную партицию
WithWriterCodec(c) / WithCodec(c)Кодек, используемый в передаче
WithWriterCodecAutoSelect() / WithCodecAutoSelect()Доверить SDK выбор лучшего кодека
WithWriterAddEncoder(codec, factory)Подключить свой кодек
WithWriterCompressorCount(n)Количество goroutine-компрессоров
WithWriterMaxQueueLen(n)Максимум буферизованных сообщений
WithWriterErrOnQueueFull(true)Возвращать ErrQueueLimitExceed вместо блокировки
WithWriterMessageMaxBytesSize(n)Локально отбрасывать сообщения сверх размера
WithSyncWrite(true) / WithWriterWaitServerAck(true)Write блокируется до серверного ACK
WithWriterSetAutoSeqNo(true), WithWriterSetAutoCreatedAt(true)Позволить SDK заполнить SeqNo и CreatedAt
WithWriterSessionMeta(map) / WithWriteSessionMeta(map)Метаданные сессии, видимые на стороне чтения
WithOnWriterFirstConnected(fn)Колбэк после первой инициализации
WithWriterCheckRetryErrorFunction(fn)Своя политика повторов
WithWriteToManyPartitions(multiOpts...)Размазать один writer на все партиции топика (см. §7.8)

CommitOffset

ОпцияЧто делает
WithCommitOffsetReadSessionID(id)Не даёт серверу убить активную сессию чтения при ручном вызове CommitOffset

5.3 Как работает builder (кратко)

Каждый тип опции реализует метод Apply*Option(req *raw.XxxRequest). Метод Client проходит по variadic-слайсу и применяет их по порядку. Конфликтующие опции — кто последний, тот и прав. Можно написать свои опции, реализовав нужный интерфейс, но это редко необходимо.

6. Администрирование топика: Create, Alter, Describe, Drop

Эта секция предполагает, что ctx подготовлен и db, _ := ydb.Open(ctx, connStr) успешно выполнен.

6.1 Создать топик

err := db.Topic().Create(ctx, "topic-path",
topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),
topicoptions.CreateWithMinActivePartitions(3),
)

6.2 Создать с important-потребителем + периодом доступности

availability := 24 * time.Hour
err := db.Topic().Create(ctx, "topic-path",
topicoptions.CreateWithConsumer(topictypes.Consumer{
Name: "my-consumer",
Important: true,
AvailabilityPeriod: &availability,
SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip},
}),
)

6.3 Добавить потребителя в существующий топик

err := db.Topic().Alter(ctx, "topic-path",
topicoptions.AlterWithAddConsumers(topictypes.Consumer{
Name: "new-consumer",
SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip},
}),
)

6.4 Изменить / сбросить период доступности потребителя

// Установить 48 часов
_ = db.Topic().Alter(ctx, "topic-path",
topicoptions.AlterConsumerWithAvailabilityPeriod("my-consumer", 48*time.Hour),
)

// Вернуть значение по умолчанию
_ = db.Topic().Alter(ctx, "topic-path",
topicoptions.AlterConsumerResetAvailabilityPeriod("my-consumer"),
)

6.5 Описать топик

desc, err := db.Topic().Describe(ctx, "topic-path")
if err != nil { return err }
fmt.Printf("партиций: %d\n", len(desc.Partitions))
for _, c := range desc.Consumers {
fmt.Printf(" потребитель: %s important=%v\n", c.Name, c.Important)
}

6.6 Описать потребителя (состояние оффсетов по партициям)

desc, err := db.Topic().DescribeTopicConsumer(ctx, "topic-path", "new-consumer")
for _, p := range desc.Partitions {
fmt.Printf("партиция %d: committed=%d lastRead=%d lag=%v\n",
p.PartitionID,
p.PartitionConsumerStats.CommittedOffset,
p.PartitionConsumerStats.LastReadOffset,
p.PartitionConsumerStats.MaxReadTimeLag,
)
}

6.7 Ручной коммит (внеполосный)

// Передавайте вместе с reader.ReadSessionID(), чтобы активная сессия чтения продолжала работать:
sessID := reader.ReadSessionID()
err := db.Topic().CommitOffset(ctx, "topic-path", /*partitionID*/ 0, "my-consumer", /*offset*/ 1234,
topicoptions.WithCommitOffsetReadSessionID(sessID),
)

6.8 Удалить топик

_ = db.Topic().Drop(ctx, "topic-path")

7. Запись сообщений: topicwriter

Writer асинхронный по умолчанию: Write возвращается после того, как сообщения попали в очередь (не обязательно подтверждены сервером). SDK обрабатывает сжатие, переподключение и повторы в фоне.

7.1 Жизненный цикл

7.2 Типичная последовательность записи

7.3 Структура Message

type Message struct {
SeqNo int64 // монотонно растущий на producer-е; SDK может заполнить через WithWriterSetAutoSeqNo
CreatedAt time.Time // SDK может заполнить через WithWriterSetAutoCreatedAt
Data io.Reader // полезная нагрузка; одноразовое чтение
Metadata map[string][]byte // ключ/значение на сообщение
Key string // выбор партиции по ключу (требует поддержки сервера)
PartitionID int64 // выбор партиции по ID (требует поддержки сервера)
Tx tx.Transaction // ставится TxWriter; не задавайте вручную
}

Поле Data — это io.Reader, оно вычитывается один раз goroutine-компрессором. Распространённая идиома — bytes.NewReader([]byte{...}) для разовых нагрузок.

7.4 Методы

МетодНазначение
WaitInit(ctx) errorБлокируется, пока завершится начальное рукопожатие writer-а
WaitInitInfo(ctx) (PublicInitialInfo, error)То же, но возвращает LastSeqNum для продолжения нумерации
Write(ctx, msgs...) errorПоложить сообщения в очередь. Возвращается, когда они в очереди (async) или подтверждены (sync, с WithSyncWrite)
Flush(ctx) errorЖдать, пока каждое in-flight сообщение получит серверный ACK
Close(ctx) errorСлить очередь, закрыть stream. После этого писать нельзя

7.5 Ошибки, о которых нужно знать

ОшибкаЗначение
ErrQueueLimitExceedОчередь полна, и стоит WithWriterErrOnQueueFull(true)
ErrMessagesPutToInternalQueueBeforeErrorWrite вернул ошибку, но часть сообщений попала в буфер и ещё может быть доставлена. Проверяйте через errors.Is(err, ...)
ErrInvalidConfigurationНеверная конфигурация writer-а (например, конфликтующие опции)
ErrUnimplementedTxWriter.WaitInitInfo на multi-writer-транзакционном пути

7.6 Модель back-pressure

  • По умолчанию (блокировка): Write блокируется, пока в очереди не появится место или ctx не будет отменён. Память ограничена, но producer может застопориться.
  • WithWriterErrOnQueueFull(true): Write сразу возвращает ErrQueueLimitExceed. Вы сами реализуете повторы/откат. Полезно, когда лучше отбросить, чем стоять.

7.7 Транзакционный writer (TxWriter)

Экспериментальный. Создаётся через Client.StartTransactionalWriter(tx, path, opts...). Записываете внутри YDB-транзакции (обычно query.Transaction); коммит транзакции атомарно фиксирует и записи.

Ключевые отличия от Writer:

  • Нет повторов. Любая ошибка означает повтор всей транзакции (стандартная транзакционная дисциплина YDB).
  • Нет Flush. Область транзакции диктует долговечность.
  • WaitInitInfo может вернуть ErrUnimplemented, когда под капотом MultiWriterWithTransaction.
err := db.Query().Do(ctx, func(ctx context.Context, s query.Session) error {
tx, err := s.Begin(ctx, query.TxSettings(query.WithSerializableReadWrite()))
if err != nil { return err }
defer tx.Rollback(ctx)

w, err := db.Topic().StartTransactionalWriter(tx, "topic-path")
if err != nil { return err }

if err := w.Write(ctx, topicwriter.Message{Data: bytes.NewReader([]byte("hi"))}); err != nil {
return err // возврат err инициирует rollback
}
return tx.CommitTx(ctx)
})

7.8 Writer на множество партиций (WithWriteToManyPartitions)

По умолчанию экземпляр Writer пишет в одну партицию — её выбирает сервер (случайно/round-robin) либо вы фиксируете её через WithPartitionID(...) / Message.PartitionID. Чтобы развернуть один логический writer на все партиции топика, включите multi-partition режим одной опцией:

w, err := db.Topic().StartWriter("topic-path",
topicoptions.WithWriteToManyPartitions(
topicoptions.WithProducerIDPrefix("svc-1"),
topicoptions.WithWriterIdleTimeout(30*time.Minute),
topicoptions.WithWriterPartitionByKey(topicoptions.BoundPartitionChooser()),
),
)

Что происходит под капотом: SDK держит по одному внутреннему Writer на партицию за публичным *topicwriter.Writer, который вы получили. Каждый вызов w.Write(...) маршрутизирует каждое сообщение в партицию через PartitionChooser, после чего передаёт его соответствующему внутреннему writer-у. Простаивающие writer-ы для отдельных партиций закрываются после WithWriterIdleTimeout.

Это экспериментально. Работает и со StartTransactionalWriter — передайте ту же опцию WithWriteToManyPartitions(...).

7.8.1 Диаграмма последовательности — чем отличается от §7.2

Вспомните простой поток writer-а (§7.2): один Writer → одна внутренняя очередь → одна goroutine-компрессор → один gRPC stream → одна партиция на сервере. Multi-partition writer сохраняет тот же внешний API, но вставляет шаг маршрутизации и разветвляет нижнюю половину:

Что изменилось по сравнению с однопартиционной диаграммой:

Шаг в §7.2 (простой)Шаг здесь (multi)
Writer открывает один stream при initПубличный Writer ещё не открывает producer-stream — он только узнаёт партиции через DescribeTopic
Write кладёт в одну очередьWrite сначала спрашивает PartitionChooser.ChoosePartition(msg), затем кладёт во внутренний writer этой партиции
Один ProducerID, одна последовательность SeqNoПо одному ProducerID на партицию (prefix + ...), у каждого свой счётчик SeqNo; сервер дедуплицирует по паре (ProducerID, SeqNo)
Один компрессор + один stream живут весь срок writer-аУ каждого внутреннего writer-а своя очередь/компрессор/stream; простаивающие закрывает idleWriterManager после WithWriterIdleTimeout
Раскладка партиций фиксируется при initBoundPartitionChooser обновляет границы из DescribeTopic после split/merge и сообщает Chooser-у про новые партиции — прозрачно для вашего кода
Flush(ctx) / Close(ctx) сливает единственную очередьFlush(ctx) / Close(ctx) сливает все активные внутренние writer-ы параллельно

Публичный API (Write, WaitInit, Flush, Close) и структура topicwriter.Message остаются идентичными — снаружи fan-out не виден.

7.8.2 Стратегии маршрутизации (взаимоисключающие)

Выберите один из трёх способов назначить каждое сообщение партиции:

СтратегияОпцияЧто предоставляет сообщениеКогда использовать
По ключу + границы (рекомендуется)WithWriterPartitionByKey(BoundPartitionChooser(...))Message.KeyАвто-разделяющиеся топики; нужна стабильная упорядоченность для каждого ключа сквозь split. Chooser обновляет границы партиций из DescribeTopic.
По ключу + Kafka-хэшWithWriterPartitionByKey(KafkaHashPartitionChooser())Message.KeyМиграция из Kafka, нужна арифметика murmur2 % N.
По явному partition IDWithWriterPartitionByPartitionID()Message.PartitionIDРедкий случай. Вы знаете партицию. Внимание: при split-е писать в закрытую партицию нельзя — writer остановится, ваше приложение должно обнаружить новую раскладку и пересоздать writer.

BoundPartitionChooser принимает WithBoundPartitionChooserPartitioningKeyHasher(fn), если нужно переопределить, как Message.Key хэшируется перед сопоставлением с границами (по умолчанию используется хэш, совместимый с давними C++/Go producer-ами).

7.8.3 Опции multi-writer

ОпцияЧто делает
WithProducerIDPrefix(prefix)Префикс, который SDK использует при генерации ProducerID для каждой партиции. Сервер использует их для дедупликации. Не запускайте два multi-writer-а с одним префиксом одновременно, если их партиции пересекаются.
WithWriterIdleTimeout(d)Как долго простаивающий внутренний writer для партиции живёт перед закрытием. Больше = лучше переиспользование при редких записях; меньше = меньше памяти.
WithWriterPartitionByKey(chooser)Маршрутизация по ключу (см. стратегии выше).
WithWriterPartitionByPartitionID()Маршрутизация по явному ID.
WithBoundPartitionChooserPartitioningKeyHasher(fn)Переопределить хэширование ключа для BoundPartitionChooser.

Хелперы, экспортированные в topicoptions:

  • topicoptions.BoundPartitionChooser(opts...) PartitionChooser
  • topicoptions.KafkaHashPartitionChooser() PartitionChooser

7.8.4 Семантика, о которой стоит знать

  • Сохраняется упорядоченность для каждого ключа. Потребители увидят сообщения с одинаковым Key в порядке отправки producer-ом, даже после того, как split перенесёт этот ключ в новую партицию.
  • Exactly-once через (ProducerID, SeqNo). Сервер подавляет дубликаты с одинаковой парой. Используйте автоматически назначаемые sequence-номера (WithWriterSetAutoSeqNo(true)) — самый простой путь.
  • Знает про авто-split. BoundPartitionChooser обновляет раскладку партиций из DescribeTopic; перекомпилировать/перезапускать на split-е не нужно.
  • Не комбинируйте стратегии. Выберите ровно одну из трёх опций маршрутизации.
  • Совместимость с TxWriter. StartTransactionalWriter(tx, path, WithWriteToManyPartitions(...)) работает так же; обычные правила TxWriter остаются в силе (нет retry, нет Flush).

7.8.5 Рабочий пример — маршрутизация по ключу через границы

w, err := db.Topic().StartWriter("orders",
topicoptions.WithWriteToManyPartitions(
topicoptions.WithProducerIDPrefix("checkout-svc"),
topicoptions.WithWriterIdleTimeout(45*time.Minute),
topicoptions.WithWriterPartitionByKey(topicoptions.BoundPartitionChooser()),
),
)
if err != nil { log.Fatal(err) }
defer w.Close(ctx)

for _, order := range orders {
err := w.Write(ctx, topicwriter.Message{
Key: order.CustomerID, // все сообщения для одного клиента → одна партиция
Data: bytes.NewReader(order.Bytes()),
})
if err != nil { log.Fatal(err) }
}

Смотрите examples/topic/topicwriter/topicwriter_to_many_partitions.go и examples/topic/topicmultiwriter/main.go для дополнительных запускаемых вариантов (Kafka-style маршрутизация, явный partition ID, транзакционный).

7.9 Короткий пример (async-запись, одна партиция)

w, err := db.Topic().StartWriter("topic-path",
topicoptions.WithProducerID("svc-1"),
topicoptions.WithCodec(topictypes.CodecGzip),
)
if err != nil { log.Fatal(err) }

err = w.Write(ctx,
topicwriter.Message{Data: bytes.NewReader([]byte("hello"))},
topicwriter.Message{Data: bytes.NewReader([]byte("world"))},
)
if err != nil { log.Fatal(err) }

if err := w.Close(ctx); err != nil { log.Fatal(err) } // важно: сливает очередь

8. Чтение сообщений, pull-модель: topicreader

Reader — клиент pull-режима: вы вызываете ReadMessage (или ReadMessagesBatch), когда хотите следующее сообщение, и Commit, когда обработали его. SDK держит внутренний буфер, наполняемый фоновой goroutine.

8.1 Модель конкурентности

МетодReadMessageReadMessageBatchCommitClose
ReadMessage--+-
ReadMessageBatch--+-
Commit++--
Close----

Простыми словами: одна goroutine читает, одна коммитит, никогда по две одновременно для одного типа операции. Нарушения вернут ErrConcurrencyCall. Это обеспечено атомарными флагами (readInFlyght, commitInFlyght) в topic/topicreader/reader.go:29-30.

8.2 Жизненный цикл

8.3 Типичная последовательность чтения + коммита (async-коммит)

8.4 Структура Message

type Message struct {
SeqNo int64
CreatedAt time.Time
MessageGroupID string
WriteSessionMetadata map[string]string // session meta со стороны writer-а
Offset int64
WrittenAt time.Time // серверная временная метка
ProducerID string
Metadata map[string][]byte // метаданные сообщения
UncompressedSize int // подсказка отправителя (может быть неверной)
// внутренние поля (commit range, одноразовый reader)
}

Методы:

  • Read(p []byte) (n int, err error) — реализует io.Reader. Стримит раскодированное содержимое. Может вернуть topicreader.ErrUnexpectedCodec, если сообщение сжато незарегистрированным кодеком. Одноразово: данные освобождаются после первой ошибки/EOF.
  • UnmarshalTo(dst MessageContentUnmarshaler) — вызывает dst.UnmarshalYDBTopicMessage(data []byte) с полным payload-ом одним слайсом. Используется хелперами topicsugar.
  • Context() context.Context — отменяется при завершении сессии партиции (переназначение сервером, разрыв). Останавливайте обработку, когда это сработает.
  • Topic() string, PartitionID() int64 — источник сообщения.

8.5 Тип Batch

type Batch struct {
Messages []*Message // гарантированно non-nil, если batch не nil
// ...
}

Batch всегда из одной партиции и в порядке оффсетов. Context(), Topic(), PartitionID() совпадают с интерфейсом Message.

8.6 Методы

МетодНазначение
WaitInit(ctx) errorБлокируется, пока reader не завершит init-рукопожатие
ReadSessionID() stringТекущий ID сессии — передавайте в Client.CommitOffset, чтобы не прерывать сессию
ReadMessage(ctx) (*Message, error)Одно сообщение за раз
ReadMessagesBatch(ctx, opts...) (*Batch, error)До N сообщений из одной партиции, упорядоченные
Commit(ctx, obj CommitRangeGetter) errorЗакоммитить *Message или *Batch; async по умолчанию
PopMessagesBatchTx(ctx, tx, opts...)Прочитать batch и закоммитить его внутри YDB-транзакции (экспериментально)
Close(ctx) errorСлить буфер коммитов; вызывайте перед выходом, иначе потеряете коммиты

8.7 Режимы коммита

  • Async (CommitModeAsync, по умолчанию)Commit кладёт диапазон коммита во внутренний буфер и сразу возвращается. SDK сливает на сервер согласно WithReaderCommitTimeLagTrigger / WithReaderCommitCountTrigger, либо при Close. Самый быстрый, но коммиты не персистентны до слива.
  • Sync (CommitModeSync, через WithReaderCommitMode(...))Commit ждёт сервер. Если сессия партиции была потеряна во время round-trip, возвращает ErrCommitToExpiredSession — это не фатально; reader переподключится, и сообщение будет просто доставлено повторно.

8.8 Ошибки, о которых нужно знать

ОшибкаЗначение
ErrUnexpectedCodecСообщение использовало незарегистрированный кодек. Добавьте декодер через WithAddDecoder
ErrConcurrencyCall (errConcurrencyCallRead / errConcurrencyCallCommit)Вы нарушили контракт конкурентности
ErrCommitToExpiredSessionSync-коммит провалился, потому что сессия переехала. Продолжайте работать

8.9 Транзакционное чтение (экспериментально)

err := db.Query().Do(ctx, func(ctx context.Context, s query.Session) error {
tx, err := s.Begin(ctx, query.TxSettings(query.WithSerializableReadWrite()))
if err != nil { return err }
defer tx.Rollback(ctx)

batch, err := reader.PopMessagesBatchTx(ctx, tx)
if err != nil { return err }

for _, m := range batch.Messages {
// обработать внутри той же tx (например, записать в query-таблицу)
}
return tx.CommitTx(ctx)
})

Если транзакция упадёт, batch будет получен заново на следующей попытке (SDK переподключится для повторной выдачи этих сообщений). Это дорого — держите область транзакции маленькой.

8.10 Короткий пример (цикл по одному сообщению)

r, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("topic-path"))
if err != nil { log.Fatal(err) }
defer r.Close(ctx)

for {
msg, err := r.ReadMessage(ctx)
if err != nil { return err }

data, err := io.ReadAll(msg)
if err != nil { return err }
fmt.Println(string(data))

if err := r.Commit(msg.Context(), msg); err != nil { return err }
}

Обратите внимание на msg.Context(): использование контекста сообщения для коммита значит, что коммит будет отменён, если сессия партиции завершится — нет смысла коммитить в мёртвую сессию.

9. Чтение сообщений, push-модель: topiclistener

Listener — клиент push-режима: вместо того чтобы вы звали Read, сервер пушит события в ваш EventHandler. Listener сам ведёт учёт (старт/стоп сессий партиций, доставку сообщений), а вы пишете методы-колбэки. Экспериментальный.

9.1 Интерфейс EventHandler

type EventHandler interface {
// неэкспортируемый метод гарантирует встраивание BaseHandler — forward compatibility
topicReaderHandler()

// Вызывается один раз после конструирования listener-а (не обязательно подключённого).
OnReaderCreated(event *ReaderReady) error

// Сервер предлагает сессию партиции — вызовите event.Confirm(), чтобы принять.
OnStartPartitionSessionRequest(ctx context.Context, event *EventStartPartitionSession) error

// Сервер прислал batch сообщений.
OnReadMessages(ctx context.Context, event *ReadMessages) error

// Сервер хочет остановить сессию партиции (graceful или forceful).
OnStopPartitionSessionRequest(ctx context.Context, event *EventStopPartitionSession) error
}

Вы обязаны встроить topiclistener.BaseHandler в свою структуру-обработчик. BaseHandler предоставляет реализации по умолчанию для каждого метода и неэкспортируемый маркер, который не позволяет случайно реализовать интерфейс. Когда в интерфейс добавят новые методы, ваш обработчик продолжит компилироваться.

type MyHandler struct {
topiclistener.BaseHandler
}

func (h *MyHandler) OnReadMessages(ctx context.Context, event *topiclistener.ReadMessages) error {
for _, m := range event.Batch.Messages {
// обработать m ...
}
event.Confirm() // async-коммит
return nil
}

9.2 Поток событий listener-а

9.3 Типы событий подробно

EventStartPartitionSession

Сервер выдал этому клиенту новую сессию партиции.

  • event.PartitionSession — идентификаторы (топик, партиция, ID сессии).
  • event.CommittedOffset — последний закоммиченный оффсет потребителя.
  • event.PartitionOffsetsStart/End доступных данных.
  • event.Confirm() — принять с дефолтами (читать с закоммиченного оффсета).
  • event.ConfirmWithParams(StartPartitionSessionConfirm{...}) — принять со своим оффсетом чтения или коммита.

BaseHandler сам вызывает event.Confirm() за вас.

ReadMessages

Batch готов.

  • event.PartitionSession
  • event.BatchMessages []*Message
  • event.Confirm() — запросить серверный коммит; async, не ждёт.
  • event.ConfirmWithAck(ctx) error — запросить коммит и дождаться серверного ACK. Может вернуть ErrCommitToExpiredSession.

EventStopPartitionSession

  • event.Gracefultrue, если сервер вежливо просит (закончите текущее); false, если он уже забрал партицию.
  • Из godoc: может быть вызван более одного раза для сессии партиции: с graceful shutdown и без. Гарантии того, что graceful=false вызовется после graceful=true, нет.
  • event.Confirm() — сигнализирует, что вы закончили работу с этой сессией.

Если Graceful=false, партиция уже переназначена; ваши последующие коммиты в этой сессии провалятся.

9.4 Модель конкурентности

Из godoc пакета:

Методы EventHandler будут вызваны последовательно для одной партиции, но могут вызываться параллельно для разных партиций.

И:

Все методы должны работать быстро, потому что вызываются из основного цикла чтения и блокируют цикл обработки сообщений.

Итого: последовательно внутри одной партиции, параллельно между партициями, никогда не блокируйте обработчик. Выносите тяжёлую работу в свой пул воркеров.

9.5 Методы listener-а

МетодНазначение
WaitInit(ctx) errorБлокируется, пока init-рукопожатие не завершится
WaitStop(ctx) errorБлокируется, пока listener полностью не остановится
ReadSessionID() stringАктивный ID сессии (меняется при переподключении)
Close(ctx) errorОстановить listener

9.6 Reader против Listener: когда что использовать

Аспектtopicreader.Readertopiclistener.TopicListener
МодельPull: вы вызываете ReadMessagePush: SDK вызывает ваш обработчик
Поток управленияТемп чтения определяете выТемп доставки определяет сервер
КонкурентностьОдна goroutine на чтение + одна на коммитGoroutine на партицию (параллельно между партициями)
КоммитЯвный Commit(ctx, obj)event.Confirm() или event.ConfirmWithAck(ctx)
Видимость жизненного цикла партицииВ основном скрыта; явно через WithReaderOnStopPartitionSessionСобытия первого класса
СтатусСтабильныйЭкспериментальный
Лучше всего дляПоследовательных пайплайнов, простых приложенийПараллельной обработки, точечного контроля партиций

10. Вспомогательные утилиты: topicsugar

topicsugar — тонкая обёртка над topicreader.Reader для эргономичного типизированного чтения.

10.1 Unmarshaler-ы для сообщений

import "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"

// JSON
var payload MyStruct
err := topicsugar.JSONUnmarshal(msg, &payload)

// Protobuf
var pb mypb.Event
err := topicsugar.ProtoUnmarshal(msg, &pb)

// Кастомный unmarshaler
err := topicsugar.UnmarshalMessageWith(msg, yaml.Unmarshal, &payload)

// Сырой callback (data невалиден после возврата из callback)
err := topicsugar.ReadMessageDataWithCallback(msg, func(data []byte) error {
// обработать data — НЕ удерживайте после возврата
return nil
})

Все эти функции оборачивают msg.UnmarshalTo(...) и могут быть вызваны не более одного раза на сообщение.

10.2 Итераторы Go 1.23

Build tag go1.23 включает range-over-func итераторы в topicsugar/iterators.go. Интерфейс итератора — xiter.Seq2[*TypedTopicMessage[T], error], идиоматичен с for ... range.

type Event struct {
UserID string `json:"user_id"`
Kind string `json:"kind"`
}

for msg, err := range topicsugar.JSONIterator[Event](ctx, reader) {
if err != nil {
log.Printf("ошибка чтения: %v", err)
break
}
fmt.Printf("offset=%d user=%s kind=%s\n", msg.Offset, msg.Data.UserID, msg.Data.Kind)
_ = reader.Commit(msg.Context(), msg.Message) // закоммитить нижележащий *Message
}
ИтераторВозвращает
TopicMessageIterator(ctx, r)*topicreader.Message (сырой)
BytesIterator(ctx, r)*TypedTopicMessage[[]byte] (клонированный payload)
StringIterator(ctx, r)*TypedTopicMessage[string]
JSONIterator[T](ctx, r)*TypedTopicMessage[T] (декодирован из JSON)
ProtobufIterator[T proto.Message](ctx, r)*TypedTopicMessage[T] (декодирован из proto)
IteratorFunc[T](ctx, r, customFn)*TypedTopicMessage[T] (ваш unmarshal)

Структура TypedTopicMessage[T] встраивает *topicreader.Message и добавляет типизированное поле Data T, так что у вас сохраняется полный доступ к Offset, CreatedAt, Context() и т. д.

10.3 Поддержка CDC (экспериментально)

YDB Change Data Capture пушит изменения строк как сообщения топика. В topicsugar есть хелперы:

type Row struct {
ID int64
Name string
}

type RowKey struct{ ID int64 }

// реализовать YDBCDCItem, чтобы хелпер знал, как разобрать ключ
func (r *Row) ParseCDCKey(...) error { /* ... */ }

for msg, err := range topicsugar.UnmarshalCDCStream[Row, RowKey](ctx, reader) {
if err != nil { break }
if msg.Erase != nil {
// строка удалена
} else if msg.NewImage != nil {
// строка обновлена/вставлена
}
}

Структура YDBCDCMessage[T, K] предоставляет Update, NewImage, OldImage, Key, Erase — позволяя единообразно обрабатывать insert/update/delete.

11. Сквозные паттерны использования

11.1 Pub-sub с consumer group

package main

import (
"bytes"
"context"
"fmt"
"io"
"log"

ydb "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
)

func main() {
ctx := context.Background()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil { log.Fatal(err) }
defer db.Close(ctx)

// 1. Создать топик с потребителем
_ = db.Topic().Create(ctx, "/local/orders",
topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),
topicoptions.CreateWithMinActivePartitions(3),
topicoptions.CreateWithConsumer(topictypes.Consumer{
Name: "billing",
SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip},
}),
)

// 2. Producer
go func() {
w, err := db.Topic().StartWriter("/local/orders",
topicoptions.WithProducerID("checkout-svc"),
topicoptions.WithCodec(topictypes.CodecGzip),
)
if err != nil { log.Fatal(err) }
defer w.Close(ctx)

for i := 0; i < 100; i++ {
payload := fmt.Sprintf(`{"order_id": %d}`, i)
if err := w.Write(ctx, topicwriter.Message{Data: bytes.NewReader([]byte(payload))}); err != nil {
log.Fatal(err)
}
}
}()

// 3. Consumer
r, err := db.Topic().StartReader("billing", topicoptions.ReadTopic("/local/orders"))
if err != nil { log.Fatal(err) }
defer r.Close(ctx)

for {
msg, err := r.ReadMessage(ctx)
if err != nil { log.Fatal(err) }
data, _ := io.ReadAll(msg)
fmt.Printf("offset=%d body=%s\n", msg.Offset, data)
_ = r.Commit(msg.Context(), msg)
}
}

11.2 Событийно-управляемая обработка через listener

package main

import (
"context"
"io"
"log"

ydb "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topiclistener"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
)

type OrderHandler struct {
topiclistener.BaseHandler // ОБЯЗАТЕЛЬНО — даёт дефолты и маркерный метод
}

func (h *OrderHandler) OnReadMessages(ctx context.Context, event *topiclistener.ReadMessages) error {
for _, m := range event.Batch.Messages {
data, err := io.ReadAll(m)
if err != nil { return err }
log.Printf("order msg p=%d off=%d body=%s", m.PartitionID(), m.Offset, data)
}
event.Confirm() // async-коммит
return nil
}

func main() {
ctx := context.Background()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil { log.Fatal(err) }
defer db.Close(ctx)

handler := &OrderHandler{}
l, err := db.Topic().StartListener("billing", handler, topicoptions.ReadTopic("/local/orders"))
if err != nil { log.Fatal(err) }

if err := l.WaitInit(ctx); err != nil { log.Fatal(err) }
// ... обработчик теперь работает в фоне

<-ctx.Done()
_ = l.Close(context.Background())
}

11.3 Типизированное чтение через итераторы (Go 1.23+)

type Event struct {
UserID string `json:"user_id"`
Action string `json:"action"`
}

r, _ := db.Topic().StartReader("analytics", topicoptions.ReadTopic("/local/events"))
defer r.Close(ctx)

for msg, err := range topicsugar.JSONIterator[Event](ctx, r) {
if err != nil {
log.Printf("стоп: %v", err)
break
}
process(msg.Data) // типизированный доступ
_ = r.Commit(msg.Context(), msg.Message) // коммит нижележащего *Message
}

12. Краткий справочник

12.1 Методы Client одним взглядом

МетодВозвращаетБлокирует?
Create(ctx, path, opts)errorда
Alter(ctx, path, opts)errorда
Describe(ctx, path, opts)(TopicDescription, error)да
DescribeTopicConsumer(ctx, path, consumer, opts)(TopicConsumerDescription, error)да
CommitOffset(ctx, path, partID, consumer, offset, opts)errorда
Drop(ctx, path, opts)errorда
StartReader(consumer, sel, opts)(*Reader, error)нет (фоновое подключение)
StartWriter(path, opts)(*Writer, error)нет
StartTransactionalWriter(tx, path, opts)(*TxWriter, error)нет
StartListener(consumer, handler, sel, opts)(*TopicListener, error)нет

12.2 Каталог ошибок

ОшибкаГдеЗначение
topicwriter.ErrQueueLimitExceedWriterОчередь полна, включён WithWriterErrOnQueueFull
topicwriter.ErrMessagesPutToInternalQueueBeforeErrorWriterЧасть сообщений попала в буфер до ошибки
topicwriter.ErrInvalidConfigurationWriterКонфликтующие опции
topicwriter.ErrUnimplementedTxWriterWaitInitInfo на multi-writer tx
topicreader.ErrUnexpectedCodecReader.Message.ReadНеизвестный кодек на проводе
topicreader.ErrConcurrencyCall*ReaderНарушение контракта конкурентности
topicreader.ErrCommitToExpiredSessionReader.Commit (sync)Сессия переехала до ACK; не фатально

12.3 Блок-схема выбора


Это руководство отражает состояние ydb-go-sdk/topic на ветке master в мае 2026. Экспериментальные API (транзакционные writer/reader, listener) могут эволюционировать; в случае сомнений сверяйтесь с VERSIONING.md.