YDB Topics в Go — Исчерпывающее руководство
Полное описание пакета topic в ydb-go-sdk, написанное для инженера, впервые знакомящегося с YDB Topics. Руководство ведёт от базовых понятий («что такое топик?») через каждый публичный тип, опцию и метод, и заканчивается сквозными рабочими примерами.
Оглавление
- Что такое YDB Topic?
- Структура пакета
- Основные понятия:
topictypes - Интерфейс
Client - Паттерн опций:
topicoptions - Администрирование топика
- Запись сообщений:
topicwriter - Чтение сообщений, pull-модель:
topicreader - Чтение сообщений, push-модель:
topiclistener - Вспомогательные утилиты:
topicsugar - Сквозные паттерны использования
- Краткий справочник
1. Что такое YDB Topic?
YDB Topic — это надёжный, упорядоченный, разделённый на партиции лог сообщений — по форме такой же, как топик в Kafka или Pulsar. Производители (producers) добавляют сообщения с одного конца; потребители (consumers) читают их с того места, на котором остановились.
Ключевая терминология:
- Топик (Topic) — именованный, персистентный поток байтов. Идентифицируется путём вида
/Root/my-topic. - Партиция (Partition) — топик разбит на одну или несколько партиций. Каждая партиция — строго упорядоченная последовательность с монотонно возрастающими оффсетами (offsets). Партиции — единица параллелизма.
- Производитель (Producer / writer) — тот, кто публикует сообщения. Производитель идентифицируется через
ProducerID; сообщения с одним и тем жеMessageGroupIDгарантированно попадут в одну партицию по порядку. - Потребитель (Consumer) — зарегистрированная роль читателя, привязанная к топику. Концептуально похоже на consumer group в Kafka, но определяется заранее на самом топике. Несколько читателей с одним и тем же именем потребителя разделяют учёт оффсетов.
- Оффсет (Offset) — позиция сообщения внутри его партиции. Потребители коммитят оффсеты, чтобы фиксировать прогресс.
- Срок хранения (Retention) — сообщения живут в течение
RetentionPeriodили пока топик не превыситRetentionStorageMB, затем удаляются. - Важный потребитель (Important consumer) — потребитель с флагом
Important=trueудерживает сообщения от удаления даже после истечения срока хранения (пока этот потребитель не закоммитит их). Полезно для потребителей, которые обязаны увидеть каждое сообщение. - Период доступности (Availability period) — переопределение на уровне потребителя: минимальное время, в течение которого сообщение остаётся доступным для этого потребителя, даже если оно не закоммичено. Позволяет медленному потребителю отстать без потери данных.
- Кодек (Codec) — сжатие на уровне передачи. Встроенные:
Raw,Gzip. Подключаемые:Lzop,Zstdили любые из пользовательского диапазона[10000, 20000). - Режим тарификации (MeteringMode) — для serverless YDB:
ReservedCapacityпротивRequestUnits. Определяет, как тарифицируется использование.
Чем это отличается от Kafka?
| YDB Topics | Kafka |
|---|---|
| Потребители объявлены на самом топике | Consumer groups возникают со стороны клиента |
| Важные потребители могут удерживать retention | Аналога нет |
| Период доступности для каждого потребителя | Только глобальный retention топика |
| Встроенные транзакции на чтение+запись с YDB query/table tx | Транзакционный API; изолирован от РСУБД |
| Встроенная поддержка CDC через тот же topic API | Отдельная (Connect / Debezium) |
Точка входа для всего нижеизложенного — db.Topic(), возвращающая topic.Client после обычного ydb.Open(...):
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil { log.Fatal(err) }
client := db.Topic()
2. Структура пакета
Go SDK разбивает topic API на семь сотрудничающих пакетов. Сам пакет topic определяет только интерфейс Client; всё остальное — в подпакетах.
Как читать эту диаграмму:
topictypes— универсальный словарь, от него зависит каждый другой пакет.topicoptions— слой builder-ов, используемый в вызовахClient.*.topicreader,topicwriter,topiclistener— три альтернативных I/O-интерфейса; выбираете один в зависимости от паттерна доступа.topicsugar— тонкий слой удобств поверхtopicreader.
Ссылки: topic/client.go:14, db.Topic() определён на *ydb.Driver.
3. Основные понятия: topictypes
Все публичные типы, описывающие форму топика, потребителей и партиций, находятся в topic/topictypes/topictypes.go. Концептуальная модель:
3.1 Codec
type Codec int32
const (
CodecRaw = Codec(...) // без сжатия
CodecGzip = Codec(...) // встроенный
CodecLzop = Codec(...) // подключите свою библиотеку
CodecZstd = Codec(...) // подключите свою библиотеку
CodecCustomerFirst = Codec(10000) // начало пользовательского диапазона
CodecCustomerEnd = Codec(20000) // не включая
)
Кастомные кодеки регистрируются на writer через topicoptions.WithWriterAddEncoder(codec, factory) и на reader через topicoptions.WithAddDecoder(codec, factory).
3.2 Consumer
type Consumer struct {
Name string // обязательное, уникально внутри топика
Important bool // удерживает retention, если отстаёт
SupportedCodecs []Codec // кодеки, которые этот потребитель может декодировать
ReadFrom time.Time // самая ранняя временная метка сообщения
Attributes map[string]string // произвольные метки
AvailabilityPeriod *time.Duration // nil = значение сервера по умолчанию
}
3.3 PartitionSettings и авто-партицирование
type PartitionSettings struct {
MinActivePartitions int64
MaxActivePartitions int64
PartitionCountLimit int64 // жёсткий потолок
AutoPartitioningSettings AutoPartitioningSettings
}
type AutoPartitioningSettings struct {
AutoPartitioningStrategy AutoPartitioningStrategy // Disabled | ScaleUp | ScaleUpAndDown | Paused
AutoPartitioningWriteSpeedStrategy AutoPartitioningWriteSpeedStrategy // окно + пороги
}
Когда авто-партицирование включено, сервер может разделить «горячую» партицию на двух потомков (записано в ParentPartitionIDs / ChildPartitionIDs структуры PartitionInfo). Читатели должны явно согласиться через topicoptions.WithReaderSupportSplitMergePartitions(true), чтобы корректно обрабатывать split/merge.
3.4 TopicDescription
Возвращается из Client.Describe():
type TopicDescription struct {
Path string
PartitionSettings PartitionSettings
Partitions []PartitionInfo
RetentionPeriod time.Duration
RetentionStorageMB int64
SupportedCodecs []Codec
PartitionWriteBurstBytes int64
PartitionWriteSpeedBytesPerSecond int64
Attributes map[string]string
Consumers []Consumer
MeteringMode MeteringMode
}
3.5 PartitionInfo и PartitionStats
type PartitionInfo struct {
PartitionID int64
Active bool
ChildPartitionIDs []int64 // присутствует, если партиция была разделена
ParentPartitionIDs []int64 // присутствует, если партиция была слита из других
PartitionStats PartitionStats
FromBound, ToBound []byte // диапазон ключей, принадлежащих партиции
}
type PartitionStats struct {
PartitionsOffset OffsetRange // Start (самое раннее живое) → End (следующая запись)
StoreSizeBytes int64
LastWriteTime *time.Time
MaxWriteTimeLag *time.Duration
BytesWritten MultipleWindowsStat // за минуту / час / день
}
3.6 TopicConsumerDescription
Возвращается из Client.DescribeTopicConsumer() — добавляет учёт оффсетов на уровне потребителя для каждой партиции:
type TopicConsumerDescription struct {
Path string
Consumer Consumer
Partitions []DescribeConsumerPartitionInfo
}
type DescribeConsumerPartitionInfo struct {
PartitionID int64
Active bool
ChildPartitionIDs []int64
ParentPartitionIDs []int64
PartitionStats PartitionStats
PartitionConsumerStats PartitionConsumerStats
}
type PartitionConsumerStats struct {
LastReadOffset int64
CommittedOffset int64
ReadSessionID string
PartitionReadSessionCreateTime *time.Time
LastReadTime *time.Time
MaxReadTimeLag *time.Duration
MaxWriteTimeLag *time.Duration
BytesRead MultipleWindowsStat
ReaderName string
}
CommittedOffset — чекпоинт потребителя; LastReadOffset — самая дальняя точка, до которой дочитал активный читатель. Зазор между ними — это работа «в полёте» (in-flight).
3.7 MeteringMode
type MeteringMode int
const (
MeteringModeUnspecified
MeteringModeReservedCapacity // предзакупленная ёмкость
MeteringModeRequestUnits // оплата по запросам
)
4. Интерфейс Client
Полный интерфейс Client находится в topic/client.go:16-87:
type Client interface {
Alter(ctx, path, opts ...AlterOption) error
Create(ctx, path, opts ...CreateOption) error
Describe(ctx, path, opts ...DescribeOption) (TopicDescription, error)
DescribeTopicConsumer(ctx, path, consumer, opts ...DescribeConsumerOption) (TopicConsumerDescription, error)
CommitOffset(ctx, path, partitionID, consumer, offset, opts ...CommitOffsetOption) error
Drop(ctx, path, opts ...DropOption) error
StartListener(consumer, handler, readSelectors, opts ...ListenerOption) (*TopicListener, error)
StartReader(consumer, readSelectors, opts ...ReaderOption) (*Reader, error)
StartWriter(topicPath, opts ...WriterOption) (*Writer, error)
StartTransactionalWriter(tx, topicpath, opts ...WriterOption) (*TxWriter, error)
}
Заметки по отдельным методам:
Create/Drop— прямолинейные; идемпотентность не встроена (второйCreateвернёт ошибку).Alter— изменяются только те поля, которые вы адресовали через опции. Добавлять/удалять потребителей, менять кодеки, партиции, retention и т. д.DescribevsDescribeTopicConsumer— первый возвращает метаданные по всему топику; второй фокусируется на одном потребителе и его представлении каждой партиции (оффсеты и отставание).CommitOffset— ручной, внеполосный коммит. Сценарии: инструменты, восстановление, replay. Важное замечание из godoc:
Используйте
topicoptions.WithCommitOffsetReadSessionID(изreader.ReadSessionID()), чтобы не прерывать текущую сессию чтения. Без него сервер прервёт активную сессию чтения для этой партиции, заставив читателя переподключаться.
Поведение сервера для необычных значений оффсета:
| Вход | Ответ сервера |
|---|---|
offset > конец партиции | BAD_REQUEST |
offset < 0 | BAD_REQUEST |
offset < закоммиченный | принят; закоммиченная позиция откатывается |
offset == закоммиченный | no-op |
-
StartReader/StartWriter/StartListener/StartTransactionalWriter— все четыре быстрые неблокирующие вызовы. gRPC-соединение возвращаемого объекта устанавливается в фоне; если нужно убедиться в успехе, вызовитеWaitInit(ctx)перед первой операцией I/O. -
Экспериментальные API —
StartListener,StartTransactionalWriterиReader.PopMessagesBatchTxпомечены как экспериментальные и могут меняться. См. VERSIONING.md.
5. Паттерн опций: topicoptions
Каждый метод Client.* заканчивается на opts ...XxxOption. Каждая опция — типизированное значение, реализующее маркерный интерфейс (CreateOption, AlterOption, ReaderOption и т. п.). Этот паттерн сохраняет стабильность сигнатур по мере добавления новых настроек.
5.1 Маркерные интерфейсы
| Интерфейс | Используется |
|---|---|
CreateOption | Client.Create |
AlterOption | Client.Alter |
DescribeOption | Client.Describe |
DescribeConsumerOption | Client.DescribeTopicConsumer |
DropOption | Client.Drop |
CommitOffsetOption | Client.CommitOffset |
ReaderOption | Client.StartReader |
WriterOption | Client.StartWriter, StartTransactionalWriter |
ListenerOption | Client.StartListener |
5.2 Часто используемые опции — шпаргалка
Create / Alter
| Опция | Что делает |
|---|---|
CreateWithSupportedCodecs(codecs ...) / AlterWithSupportedCodecs(...) | Ограничивает кодеки, принимаемые топиком |
CreateWithMinActivePartitions(n) / CreateWithMaxActivePartitions(n) | Статический размер партиций |
CreateWithPartitionCountLimit(n) | Жёсткий потолок |
CreateWithAutoPartitioningSettings(s) | Включить авто split/merge |
CreateWithRetentionPeriod(d) / CreateWithRetentionStorageMB(mb) | Как долго / сколько хранить |
CreateWithPartitionWriteSpeedBytesPerSecond(n) / CreateWithPartitionWriteBurstBytes(n) | Лимиты пропускной способности на партицию |
CreateWithAttributes(map) | Произвольные метки |
CreateWithConsumer(consumers...) / AlterWithAddConsumers(...) / AlterWithDropConsumers(names...) | Управление регистрациями потребителей |
AlterConsumerWithImportant(name, bool) | Переключить флаг удержания retention |
AlterConsumerWithReadFrom(name, t) | Пропускать сообщения старше t |
AlterConsumerWithSupportedCodecs(name, []Codec) | Ограничение кодеков для потребителя |
AlterConsumerWithAttributes(name, map) | Метки потребителя |
AlterConsumerWithAvailabilityPeriod(name, d) | Минимальный retention для потребителя |
AlterConsumerResetAvailabilityPeriod(name) | Вернуть значение топика по умолчанию |
CreateWithMeteringMode(mode) / AlterWithMeteringMode(mode) | Режим тарификации (serverless) |
Reader (topicoptions_reader.go)
| Опция | Что делает |
|---|---|
ReadTopic(path) | Сокращение, создающее ReadSelectors для одного топика |
WithReaderOperationTimeout(d), WithReaderStartTimeout(d) | Дедлайны соединения/операции |
WithReaderCheckRetryErrorFunction(fn) | Своя политика повторов |
WithReaderCommitTimeLagTrigger(d), WithReaderCommitCountTrigger(n) | Триггеры пакетных коммитов в async-режиме |
WithReaderCommitMode(mode) | CommitModeAsync (по умолчанию, быстро) или CommitModeSync |
WithReaderBatchMaxCount(n), WithReaderBufferSizeBytes(n) | Буферизация на приём |
WithAddDecoder(codec, factory) | Регистрация кастомного кодека |
WithReaderGetPartitionStartOffset(fn) | Решить, с какого оффсета стартовать в партиции |
WithReaderSupportSplitMergePartitions(true) | Согласиться на семантику split/merge |
WithReaderWithoutConsumer(saveState) | Чтение без зарегистрированного потребителя (без серверного учёта оффсетов) |
WithReaderOnStopPartitionSession(fn) | Колбэк, когда сервер переназначает партицию |
Writer (topicoptions_writer.go)
| Опция | Что делает |
|---|---|
WithWriterProducerID(id) / WithProducerID(id) | Стабильная идентичность producer-а |
WithWriterPartitionID(id) / WithPartitionID(id) | Пин записи в конкретную партицию |
WithWriterCodec(c) / WithCodec(c) | Кодек, используемый в передаче |
WithWriterCodecAutoSelect() / WithCodecAutoSelect() | Доверить SDK выбор лучшего кодека |
WithWriterAddEncoder(codec, factory) | Подключить свой кодек |
WithWriterCompressorCount(n) | Количество goroutine-компрессоров |
WithWriterMaxQueueLen(n) | Максимум буферизованных сообщений |
WithWriterErrOnQueueFull(true) | Возвращать ErrQueueLimitExceed вместо блокировки |
WithWriterMessageMaxBytesSize(n) | Локально отбрасывать сообщения сверх размера |
WithSyncWrite(true) / WithWriterWaitServerAck(true) | Write блокируется до серверного ACK |
WithWriterSetAutoSeqNo(true), WithWriterSetAutoCreatedAt(true) | Позволить SDK заполнить SeqNo и CreatedAt |
WithWriterSessionMeta(map) / WithWriteSessionMeta(map) | Метаданные сессии, видимые на стороне чтения |
WithOnWriterFirstConnected(fn) | Колбэк после первой инициализации |
WithWriterCheckRetryErrorFunction(fn) | Своя политика повторов |
WithWriteToManyPartitions(multiOpts...) | Размазать один writer на все партиции топика (см. §7.8) |
CommitOffset
| Опция | Что делает |
|---|---|
WithCommitOffsetReadSessionID(id) | Не даёт серверу убить активную сессию чтения при ручном вызове CommitOffset |
5.3 Как работает builder (кратко)
Каждый тип опции реализует метод Apply*Option(req *raw.XxxRequest). Метод Client проходит по variadic-слайсу и применяет их по порядку. Конфликтующие опции — кто последний, тот и прав. Можно написать свои опции, реализовав нужный интерфейс, но это редко необходимо.
6. Администрирование топика: Create, Alter, Describe, Drop
Эта секция предполагает, что ctx подготовлен и db, _ := ydb.Open(ctx, connStr) успешно выполнен.
6.1 Создать топик
err := db.Topic().Create(ctx, "topic-path",
topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),
topicoptions.CreateWithMinActivePartitions(3),
)
6.2 Создать с important-потребителем + периодом доступности
availability := 24 * time.Hour
err := db.Topic().Create(ctx, "topic-path",
topicoptions.CreateWithConsumer(topictypes.Consumer{
Name: "my-consumer",
Important: true,
AvailabilityPeriod: &availability,
SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip},
}),
)
6.3 Добавить потребителя в существующий топик
err := db.Topic().Alter(ctx, "topic-path",
topicoptions.AlterWithAddConsumers(topictypes.Consumer{
Name: "new-consumer",
SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip},
}),
)
6.4 Изменить / сбросить период доступности потребителя
// Установить 48 часов
_ = db.Topic().Alter(ctx, "topic-path",
topicoptions.AlterConsumerWithAvailabilityPeriod("my-consumer", 48*time.Hour),
)
// Вернуть значение по умолчанию
_ = db.Topic().Alter(ctx, "topic-path",
topicoptions.AlterConsumerResetAvailabilityPeriod("my-consumer"),
)
6.5 Описать топик
desc, err := db.Topic().Describe(ctx, "topic-path")
if err != nil { return err }
fmt.Printf("партиций: %d\n", len(desc.Partitions))
for _, c := range desc.Consumers {
fmt.Printf(" потребитель: %s important=%v\n", c.Name, c.Important)
}
6.6 Описать потребителя (состояние оффсетов по партициям)
desc, err := db.Topic().DescribeTopicConsumer(ctx, "topic-path", "new-consumer")
for _, p := range desc.Partitions {
fmt.Printf("партиция %d: committed=%d lastRead=%d lag=%v\n",
p.PartitionID,
p.PartitionConsumerStats.CommittedOffset,
p.PartitionConsumerStats.LastReadOffset,
p.PartitionConsumerStats.MaxReadTimeLag,
)
}
6.7 Ручной коммит (внеполосный)
// Передавайте вместе с reader.ReadSessionID(), чтобы активная сессия чтения продолжала работать:
sessID := reader.ReadSessionID()
err := db.Topic().CommitOffset(ctx, "topic-path", /*partitionID*/ 0, "my-consumer", /*offset*/ 1234,
topicoptions.WithCommitOffsetReadSessionID(sessID),
)
6.8 Удалить топик
_ = db.Topic().Drop(ctx, "topic-path")
7. Запись сообщений: topicwriter
Writer асинхронный по умолчанию: Write возвращается после того, как сообщения попали в очередь (не обязательно подтверждены сервером). SDK обрабатывает сжатие, переподключение и повторы в фоне.
7.1 Жизненный цикл
7.2 Типичная последовательность записи
7.3 Структура Message
type Message struct {
SeqNo int64 // монотонно растущий на producer-е; SDK может заполнить через WithWriterSetAutoSeqNo
CreatedAt time.Time // SDK может заполнить через WithWriterSetAutoCreatedAt
Data io.Reader // полезная нагрузка; одноразовое чтение
Metadata map[string][]byte // ключ/значение на сообщение
Key string // выбор партиции по ключу (требует поддержки сервера)
PartitionID int64 // выбор партиции по ID (требует поддержки сервера)
Tx tx.Transaction // ставится TxWriter; не задавайте вручную
}
Поле Data — это io.Reader, оно вычитывается один раз goroutine-компрессором. Распространённая идиома — bytes.NewReader([]byte{...}) для разовых нагрузок.
7.4 Методы
| Метод | Назначение |
|---|---|
WaitInit(ctx) error | Блокируется, пока завершится начальное рукопожатие writer-а |
WaitInitInfo(ctx) (PublicInitialInfo, error) | То же, но возвращает LastSeqNum для продолжения нумерации |
Write(ctx, msgs...) error | Положить сообщения в очередь. Возвращается, когда они в очереди (async) или подтверждены (sync, с WithSyncWrite) |
Flush(ctx) error | Ждать, пока каждое in-flight сообщение получит серверный ACK |
Close(ctx) error | Слить очередь, закрыть stream. После этого писать нельзя |
7.5 Ошибки, о которых нужно знать
| Ошибка | Значение |
|---|---|
ErrQueueLimitExceed | Очередь полна, и стоит WithWriterErrOnQueueFull(true) |
ErrMessagesPutToInternalQueueBeforeError | Write вернул ошибку, но часть сообщений попала в буфер и ещё может быть доставлена. Проверяйте через errors.Is(err, ...) |
ErrInvalidConfiguration | Неверная конфигурация writer-а (например, конфликтующие опции) |
ErrUnimplemented | TxWriter.WaitInitInfo на multi-writer-транзакционном пути |
7.6 Модель back-pressure
- По умолчанию (блокировка):
Writeблокируется, пока в очереди не появится место илиctxне будет отменён. Память ограничена, но producer может застопориться. WithWriterErrOnQueueFull(true):Writeсразу возвращаетErrQueueLimitExceed. Вы сами реализуете повторы/откат. Полезно, когда лучше отбросить, чем стоять.
7.7 Транзакционный writer (TxWriter)
Экспериментальный. Создаётся через Client.StartTransactionalWriter(tx, path, opts...). Записываете внутри YDB-транзакции (обычно query.Transaction); коммит транзакции атомарно фиксирует и записи.
Ключевые отличия от Writer:
- Нет повторов. Любая ошибка означает повтор всей транзакции (стандартная транзакционная дисциплина YDB).
- Нет
Flush. Область транзакции диктует долговечность. WaitInitInfoможет вернутьErrUnimplemented, когда под капотомMultiWriterWithTransaction.
err := db.Query().Do(ctx, func(ctx context.Context, s query.Session) error {
tx, err := s.Begin(ctx, query.TxSettings(query.WithSerializableReadWrite()))
if err != nil { return err }
defer tx.Rollback(ctx)
w, err := db.Topic().StartTransactionalWriter(tx, "topic-path")
if err != nil { return err }
if err := w.Write(ctx, topicwriter.Message{Data: bytes.NewReader([]byte("hi"))}); err != nil {
return err // возврат err инициирует rollback
}
return tx.CommitTx(ctx)
})
7.8 Writer на множество партиций (WithWriteToManyPartitions)
По умолчанию экземпляр Writer пишет в одну партицию — её выбирает сервер (случайно/round-robin) либо вы фиксируете её через WithPartitionID(...) / Message.PartitionID. Чтобы развернуть один логический writer на все партиции топика, включите multi-partition режим одной опцией:
w, err := db.Topic().StartWriter("topic-path",
topicoptions.WithWriteToManyPartitions(
topicoptions.WithProducerIDPrefix("svc-1"),
topicoptions.WithWriterIdleTimeout(30*time.Minute),
topicoptions.WithWriterPartitionByKey(topicoptions.BoundPartitionChooser()),
),
)
Что происходит под капотом: SDK держит по одному внутреннему Writer на партицию за публичным *topicwriter.Writer, который вы получили. Каждый вызов w.Write(...) маршрутизирует каждое сообщение в партицию через PartitionChooser, после чего передаёт его соответствующему внутреннему writer-у. Простаивающие writer-ы для отдельных партиций закрываются после WithWriterIdleTimeout.
Это экспериментально. Работает и со StartTransactionalWriter — передайте ту же опцию WithWriteToManyPartitions(...).
7.8.1 Диаграмма последовательности — чем отличается от §7.2
Вспомните простой поток writer-а (§7.2): один Writer → одна внутренняя очередь → одна goroutine-компрессор → один gRPC stream → одна партиция на сервере. Multi-partition writer сохраняет тот же внешний API, но вставляет шаг маршрутизации и разветвляет нижнюю половину:
Что изменилось по сравнению с однопартиционной диаграммой:
| Шаг в §7.2 (простой) | Шаг здесь (multi) |
|---|---|
Writer открывает один stream при init | Публичный Writer ещё не открывает producer-stream — он только узнаёт партиции через DescribeTopic |
Write кладёт в одну очередь | Write сначала спрашивает PartitionChooser.ChoosePartition(msg), затем кладёт во внутренний writer этой партиции |
Один ProducerID, одна последовательность SeqNo | По одному ProducerID на партицию (prefix + ...), у каждого свой счётчик SeqNo; сервер дедуплицирует по паре (ProducerID, SeqNo) |
| Один компрессор + один stream живут весь срок writer-а | У каждого внутреннего writer-а своя очередь/компрессор/stream; простаивающие закрывает idleWriterManager после WithWriterIdleTimeout |
| Раскладка партиций фиксируется при init | BoundPartitionChooser обновляет границы из DescribeTopic после split/merge и сообщает Chooser-у про новые партиции — прозрачно для вашего кода |
Flush(ctx) / Close(ctx) сливает единственную очередь | Flush(ctx) / Close(ctx) сливает все активные внутренние writer-ы параллельно |
Публичный API (Write, WaitInit, Flush, Close) и структура topicwriter.Message остаются идентичными — снаружи fan-out не виден.
7.8.2 Стратегии маршрутизации (взаимоисключающие)
Выберите один из трёх способов назначить каждое сообщение партиции:
| Стратегия | Опция | Что предоставляет сообщение | Когда использовать |
|---|---|---|---|
| По ключу + границы (рекомендуется) | WithWriterPartitionByKey(BoundPartitionChooser(...)) | Message.Key | Авто-разделяющиеся топики; нужна стабильная упорядоченность для каждого ключа сквозь split. Chooser обновляет границы партиций из DescribeTopic. |
| По ключу + Kafka-хэш | WithWriterPartitionByKey(KafkaHashPartitionChooser()) | Message.Key | Миграция из Kafka, нужна арифметика murmur2 % N. |
| По явному partition ID | WithWriterPartitionByPartitionID() | Message.PartitionID | Редкий случай. Вы знаете партицию. Внимание: при split-е писать в закрытую партицию нельзя — writer остановится, ваше приложение должно обнаружить новую раскладку и пересоздать writer. |
BoundPartitionChooser принимает WithBoundPartitionChooserPartitioningKeyHasher(fn), если нужно переопределить, как Message.Key хэшируется перед сопоставлением с границами (по умолчанию используется хэш, совместимый с давними C++/Go producer-ами).
7.8.3 Опции multi-writer
| Опция | Что делает |
|---|---|
WithProducerIDPrefix(prefix) | Префикс, который SDK использует при генерации ProducerID для каждой партиции. Сервер использует их для дедупликации. Не запускайте два multi-writer-а с одним префиксом одновременно, если их партиции пересекаются. |
WithWriterIdleTimeout(d) | Как долго простаивающий внутренний writer для партиции живёт перед закрытием. Больше = лучше переиспользование при редких записях; меньше = меньше памяти. |
WithWriterPartitionByKey(chooser) | Маршрутизация по ключу (см. стратегии выше). |
WithWriterPartitionByPartitionID() | Маршрутизация по явному ID. |
WithBoundPartitionChooserPartitioningKeyHasher(fn) | Переопределить хэширование ключа для BoundPartitionChooser. |
Хелперы, экспортированные в topicoptions:
topicoptions.BoundPartitionChooser(opts...) PartitionChoosertopicoptions.KafkaHashPartitionChooser() PartitionChooser
7.8.4 Семантика, о которой стоит знать
- Сохраняется упорядоченность для каждого ключа. Потребители увидят сообщения с одинаковым
Keyв порядке отправки producer-ом, даже после того, как split перенесёт этот ключ в новую партицию. - Exactly-once через (ProducerID, SeqNo). Сервер подавляет дубликаты с одинаковой парой. Используйте автоматически назначаемые sequence-номера (
WithWriterSetAutoSeqNo(true)) — самый простой путь. - Знает про авто-split.
BoundPartitionChooserобновляет раскладку партиций изDescribeTopic; перекомпилировать/перезапускать на split-е не нужно. - Не комбинируйте стратегии. Выберите ровно одну из трёх опций маршрутизации.
- Совместимость с
TxWriter.StartTransactionalWriter(tx, path, WithWriteToManyPartitions(...))работает так же; обычные правилаTxWriterостаются в силе (нет retry, нет Flush).
7.8.5 Рабочий пример — маршрутизация по ключу через границы
w, err := db.Topic().StartWriter("orders",
topicoptions.WithWriteToManyPartitions(
topicoptions.WithProducerIDPrefix("checkout-svc"),
topicoptions.WithWriterIdleTimeout(45*time.Minute),
topicoptions.WithWriterPartitionByKey(topicoptions.BoundPartitionChooser()),
),
)
if err != nil { log.Fatal(err) }
defer w.Close(ctx)
for _, order := range orders {
err := w.Write(ctx, topicwriter.Message{
Key: order.CustomerID, // все сообщения для одного клиента → одна партиция
Data: bytes.NewReader(order.Bytes()),
})
if err != nil { log.Fatal(err) }
}
Смотрите examples/topic/topicwriter/topicwriter_to_many_partitions.go и examples/topic/topicmultiwriter/main.go для дополнительных запускаемых вариантов (Kafka-style маршрутизация, явный partition ID, транзакционный).
7.9 Короткий пример (async-запись, одна партиция)
w, err := db.Topic().StartWriter("topic-path",
topicoptions.WithProducerID("svc-1"),
topicoptions.WithCodec(topictypes.CodecGzip),
)
if err != nil { log.Fatal(err) }
err = w.Write(ctx,
topicwriter.Message{Data: bytes.NewReader([]byte("hello"))},
topicwriter.Message{Data: bytes.NewReader([]byte("world"))},
)
if err != nil { log.Fatal(err) }
if err := w.Close(ctx); err != nil { log.Fatal(err) } // важно: сливает очередь
8. Чтение сообщений, pull-модель: topicreader
Reader — клиент pull-режима: вы вызываете ReadMessage (или ReadMessagesBatch), когда хотите следующее сообщение, и Commit, когда обработали его. SDK держит внутренний буфер, наполняемый фоновой goroutine.
8.1 Модель конкурентности
| Метод | ReadMessage | ReadMessageBatch | Commit | Close |
|---|---|---|---|---|
| ReadMessage | - | - | + | - |
| ReadMessageBatch | - | - | + | - |
| Commit | + | + | - | - |
| Close | - | - | - | - |
Простыми словами: одна goroutine читает, одна коммитит, никогда по две одновременно для одного типа операции. Нарушения вернут ErrConcurrencyCall. Это обеспечено атомарными флагами (readInFlyght, commitInFlyght) в topic/topicreader/reader.go:29-30.
8.2 Жизненный цикл
8.3 Типичная последовательность чтения + коммита (async-коммит)
8.4 Структура Message
type Message struct {
SeqNo int64
CreatedAt time.Time
MessageGroupID string
WriteSessionMetadata map[string]string // session meta со стороны writer-а
Offset int64
WrittenAt time.Time // серверная временная метка
ProducerID string
Metadata map[string][]byte // метаданные сообщения
UncompressedSize int // подсказка отправителя (может быть неверной)
// внутренние поля (commit range, одноразовый reader)
}
Методы:
Read(p []byte) (n int, err error)— реализуетio.Reader. Стримит раскодированное содержимое. Может вернутьtopicreader.ErrUnexpectedCodec, если сообщение сжато незарегистрированным кодеком. Одноразово: данные освобождаются после первой ошибки/EOF.UnmarshalTo(dst MessageContentUnmarshaler)— вызываетdst.UnmarshalYDBTopicMessage(data []byte)с полным payload-ом одним слайсом. Используется хелперамиtopicsugar.Context() context.Context— отменяется при завершении сессии партиции (переназначение сервером, разрыв). Останавливайте обработку, когда это сработает.Topic() string,PartitionID() int64— источник сообщения.
8.5 Тип Batch
type Batch struct {
Messages []*Message // гарантированно non-nil, если batch не nil
// ...
}
Batch всегда из одной партиции и в порядке оффсетов. Context(), Topic(), PartitionID() совпадают с интерфейсом Message.
8.6 Методы
| Метод | Назначение |
|---|---|
WaitInit(ctx) error | Блокируется, пока reader не завершит init-рукопожатие |
ReadSessionID() string | Текущий ID сессии — передавайте в Client.CommitOffset, чтобы не прерывать сессию |
ReadMessage(ctx) (*Message, error) | Одно сообщение за раз |
ReadMessagesBatch(ctx, opts...) (*Batch, error) | До N сообщений из одной партиции, упорядоченные |
Commit(ctx, obj CommitRangeGetter) error | Закоммитить *Message или *Batch; async по умолчанию |
PopMessagesBatchTx(ctx, tx, opts...) | Прочитать batch и закоммитить его внутри YDB-транзакции (экспериментально) |
Close(ctx) error | Слить буфер коммитов; вызывайте перед выходом, иначе потеряете коммиты |
8.7 Режимы коммита
- Async (
CommitModeAsync, по умолчанию) —Commitкладёт диапазон коммита во внутренний буфер и сразу возвращается. SDK сливает на сервер согласноWithReaderCommitTimeLagTrigger/WithReaderCommitCountTrigger, либо приClose. Самый быстрый, но коммиты не персистентны до слива. - Sync (
CommitModeSync, черезWithReaderCommitMode(...)) —Commitждёт сервер. Если сессия партиции была потеряна во время round-trip, возвращаетErrCommitToExpiredSession— это не фатально; reader переподключится, и сообщение будет просто доставлено повторно.
8.8 Ошибки, о которых нужно знать
| Ошибка | Значение |
|---|---|
ErrUnexpectedCodec | Сообщение использовало незарегистрированный кодек. Добавьте декодер через WithAddDecoder |
ErrConcurrencyCall (errConcurrencyCallRead / errConcurrencyCallCommit) | Вы нарушили контракт конкурентности |
ErrCommitToExpiredSession | Sync-коммит провалился, потому что сессия переехала. Продолжайте работать |
8.9 Транзакционное чтение (экспериментально)
err := db.Query().Do(ctx, func(ctx context.Context, s query.Session) error {
tx, err := s.Begin(ctx, query.TxSettings(query.WithSerializableReadWrite()))
if err != nil { return err }
defer tx.Rollback(ctx)
batch, err := reader.PopMessagesBatchTx(ctx, tx)
if err != nil { return err }
for _, m := range batch.Messages {
// обработать внутри той же tx (например, записать в query-таблицу)
}
return tx.CommitTx(ctx)
})
Если транзакция упадёт, batch будет получен заново на следующей попытке (SDK переподключится для повторной выдачи этих сообщений). Это дорого — держите область транзакции маленькой.
8.10 Короткий пример (цикл по одному сообщению)
r, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("topic-path"))
if err != nil { log.Fatal(err) }
defer r.Close(ctx)
for {
msg, err := r.ReadMessage(ctx)
if err != nil { return err }
data, err := io.ReadAll(msg)
if err != nil { return err }
fmt.Println(string(data))
if err := r.Commit(msg.Context(), msg); err != nil { return err }
}
Обратите внимание на msg.Context(): использование контекста сообщения для коммита значит, что коммит будет отменён, если сессия партиции завершится — нет смысла коммитить в мёртвую сессию.
9. Чтение сообщений, push-модель: topiclistener
Listener — клиент push-режима: вместо того чтобы вы звали Read, сервер пушит события в ваш EventHandler. Listener сам ведёт учёт (старт/стоп сессий партиций, доставку сообщений), а вы пишете методы-колбэки. Экспериментальный.
9.1 Интерфейс EventHandler
type EventHandler interface {
// неэкспортируемый метод гарантирует встраивание BaseHandler — forward compatibility
topicReaderHandler()
// Вызывается один раз после конструирования listener-а (не обязательно подключённого).
OnReaderCreated(event *ReaderReady) error
// Сервер предлагает сессию партиции — вызовите event.Confirm(), чтобы принять.
OnStartPartitionSessionRequest(ctx context.Context, event *EventStartPartitionSession) error
// Сервер прислал batch сообщений.
OnReadMessages(ctx context.Context, event *ReadMessages) error
// Сервер хочет остановить сессию партиции (graceful или forceful).
OnStopPartitionSessionRequest(ctx context.Context, event *EventStopPartitionSession) error
}
Вы обязаны встроить topiclistener.BaseHandler в свою структуру-обработчик. BaseHandler предоставляет реализации по умолчанию для каждого метода и неэкспортируемый маркер, который не позволяет случайно реализовать интерфейс. Когда в интерфейс добавят новые методы, ваш обработчик продолжит компилироваться.
type MyHandler struct {
topiclistener.BaseHandler
}
func (h *MyHandler) OnReadMessages(ctx context.Context, event *topiclistener.ReadMessages) error {
for _, m := range event.Batch.Messages {
// обработать m ...
}
event.Confirm() // async-коммит
return nil
}
9.2 Поток событий listener-а
9.3 Типы событий подробно
EventStartPartitionSession
Сервер выдал этому клиенту новую сессию партиции.
event.PartitionSession— идентификаторы (топик, партиция, ID сессии).event.CommittedOffset— последний закоммиченный оффсет потребителя.event.PartitionOffsets—Start/Endдоступных данных.event.Confirm()— принять с дефолтами (читать с закоммиченного оффсета).event.ConfirmWithParams(StartPartitionSessionConfirm{...})— принять со своим оффсетом чтения или коммита.
BaseHandler сам вызывает event.Confirm() за вас.
ReadMessages
Batch готов.
event.PartitionSessionevent.Batch—Messages []*Messageevent.Confirm()— запросить серверный коммит; async, не ждёт.event.ConfirmWithAck(ctx) error— запросить коммит и дождаться серверного ACK. Может вернутьErrCommitToExpiredSession.
EventStopPartitionSession
event.Graceful—true, если сервер вежливо просит (закончите текущее);false, если он уже забрал партицию.- Из godoc: может быть вызван более одного раза для сессии партиции: с graceful shutdown и без. Гарантии того, что graceful=false вызовется после graceful=true, нет.
event.Confirm()— сигнализирует, что вы закончили работу с этой сессией.
Если Graceful=false, партиция уже переназначена; ваши последующие коммиты в этой сессии провалятся.
9.4 Модель конкурентности
Из godoc пакета:
Методы EventHandler будут вызваны последовательно для одной партиции, но могут вызываться параллельно для разных партиций.
И:
Все методы должны работать быстро, потому что вызываются из основного цикла чтения и блокируют цикл обработки сообщений.
Итого: последовательно внутри одной партиции, параллельно между партициями, никогда не блокируйте обработчик. Выносите тяжёлую работу в свой пул воркеров.
9.5 Методы listener-а
| Метод | Назначение |
|---|---|
WaitInit(ctx) error | Блокируется, пока init-рукопожатие не завершится |
WaitStop(ctx) error | Блокируется, пока listener полностью не остановится |
ReadSessionID() string | Активный ID сессии (меняется при переподключении) |
Close(ctx) error | Остановить listener |
9.6 Reader против Listener: когда что использовать
| Аспект | topicreader.Reader | topiclistener.TopicListener |
|---|---|---|
| Модель | Pull: вы вызываете ReadMessage | Push: SDK вызывает ваш обработчик |
| Поток управления | Темп чтения определяете вы | Темп доставки определяет сервер |
| Конкурентность | Одна goroutine на чтение + одна на коммит | Goroutine на партицию (параллельно между партициями) |
| Коммит | Явный Commit(ctx, obj) | event.Confirm() или event.ConfirmWithAck(ctx) |
| Видимость жизненного цикла партиции | В основном скрыта; явно через WithReaderOnStopPartitionSession | События первого класса |
| Статус | Стабильный | Экспериментальный |
| Лучше всего для | Последовательных пайплайнов, простых приложений | Параллельной обработки, точечного контроля партиций |
10. Вспомогательные утилиты: topicsugar
topicsugar — тонкая обёртка над topicreader.Reader для эргономичного типизированного чтения.
10.1 Unmarshaler-ы для сообщений
import "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
// JSON
var payload MyStruct
err := topicsugar.JSONUnmarshal(msg, &payload)
// Protobuf
var pb mypb.Event
err := topicsugar.ProtoUnmarshal(msg, &pb)
// Кастомный unmarshaler
err := topicsugar.UnmarshalMessageWith(msg, yaml.Unmarshal, &payload)
// Сырой callback (data невалиден после возврата из callback)
err := topicsugar.ReadMessageDataWithCallback(msg, func(data []byte) error {
// обработать data — НЕ удерживайте после возврата
return nil
})
Все эти функции оборачивают msg.UnmarshalTo(...) и могут быть вызваны не более одного раза на сообщение.
10.2 Итераторы Go 1.23
Build tag go1.23 включает range-over-func итераторы в topicsugar/iterators.go. Интерфейс итератора — xiter.Seq2[*TypedTopicMessage[T], error], идиоматичен с for ... range.
type Event struct {
UserID string `json:"user_id"`
Kind string `json:"kind"`
}
for msg, err := range topicsugar.JSONIterator[Event](ctx, reader) {
if err != nil {
log.Printf("ошибка чтения: %v", err)
break
}
fmt.Printf("offset=%d user=%s kind=%s\n", msg.Offset, msg.Data.UserID, msg.Data.Kind)
_ = reader.Commit(msg.Context(), msg.Message) // закоммитить нижележащий *Message
}
| Итератор | Возвращает |
|---|---|
TopicMessageIterator(ctx, r) | *topicreader.Message (сырой) |
BytesIterator(ctx, r) | *TypedTopicMessage[[]byte] (клонированный payload) |
StringIterator(ctx, r) | *TypedTopicMessage[string] |
JSONIterator[T](ctx, r) | *TypedTopicMessage[T] (декодирован из JSON) |
ProtobufIterator[T proto.Message](ctx, r) | *TypedTopicMessage[T] (декодирован из proto) |
IteratorFunc[T](ctx, r, customFn) | *TypedTopicMessage[T] (ваш unmarshal) |
Структура TypedTopicMessage[T] встраивает *topicreader.Message и добавляет типизированное поле Data T, так что у вас сохраняется полный доступ к Offset, CreatedAt, Context() и т. д.
10.3 Поддержка CDC (экспериментально)
YDB Change Data Capture пушит изменения строк как сообщения топика. В topicsugar есть хелперы:
type Row struct {
ID int64
Name string
}
type RowKey struct{ ID int64 }
// реализовать YDBCDCItem, чтобы хелпер знал, как разобрать ключ
func (r *Row) ParseCDCKey(...) error { /* ... */ }
for msg, err := range topicsugar.UnmarshalCDCStream[Row, RowKey](ctx, reader) {
if err != nil { break }
if msg.Erase != nil {
// строка удалена
} else if msg.NewImage != nil {
// строка обновлена/вставлена
}
}
Структура YDBCDCMessage[T, K] предоставляет Update, NewImage, OldImage, Key, Erase — позволяя единообразно обрабатывать insert/update/delete.
11. Сквозные паттерны использования
11.1 Pub-sub с consumer group
package main
import (
"bytes"
"context"
"fmt"
"io"
"log"
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
)
func main() {
ctx := context.Background()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil { log.Fatal(err) }
defer db.Close(ctx)
// 1. Создать топик с потребителем
_ = db.Topic().Create(ctx, "/local/orders",
topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),
topicoptions.CreateWithMinActivePartitions(3),
topicoptions.CreateWithConsumer(topictypes.Consumer{
Name: "billing",
SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip},
}),
)
// 2. Producer
go func() {
w, err := db.Topic().StartWriter("/local/orders",
topicoptions.WithProducerID("checkout-svc"),
topicoptions.WithCodec(topictypes.CodecGzip),
)
if err != nil { log.Fatal(err) }
defer w.Close(ctx)
for i := 0; i < 100; i++ {
payload := fmt.Sprintf(`{"order_id": %d}`, i)
if err := w.Write(ctx, topicwriter.Message{Data: bytes.NewReader([]byte(payload))}); err != nil {
log.Fatal(err)
}
}
}()
// 3. Consumer
r, err := db.Topic().StartReader("billing", topicoptions.ReadTopic("/local/orders"))
if err != nil { log.Fatal(err) }
defer r.Close(ctx)
for {
msg, err := r.ReadMessage(ctx)
if err != nil { log.Fatal(err) }
data, _ := io.ReadAll(msg)
fmt.Printf("offset=%d body=%s\n", msg.Offset, data)
_ = r.Commit(msg.Context(), msg)
}
}
11.2 Событийно-управляемая обработка через listener
package main
import (
"context"
"io"
"log"
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topiclistener"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
)
type OrderHandler struct {
topiclistener.BaseHandler // ОБЯЗАТЕЛЬНО — даёт дефолты и маркерный метод
}
func (h *OrderHandler) OnReadMessages(ctx context.Context, event *topiclistener.ReadMessages) error {
for _, m := range event.Batch.Messages {
data, err := io.ReadAll(m)
if err != nil { return err }
log.Printf("order msg p=%d off=%d body=%s", m.PartitionID(), m.Offset, data)
}
event.Confirm() // async-коммит
return nil
}
func main() {
ctx := context.Background()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil { log.Fatal(err) }
defer db.Close(ctx)
handler := &OrderHandler{}
l, err := db.Topic().StartListener("billing", handler, topicoptions.ReadTopic("/local/orders"))
if err != nil { log.Fatal(err) }
if err := l.WaitInit(ctx); err != nil { log.Fatal(err) }
// ... обработчик теперь работает в фоне
<-ctx.Done()
_ = l.Close(context.Background())
}
11.3 Типизированное чтение через итераторы (Go 1.23+)
type Event struct {
UserID string `json:"user_id"`
Action string `json:"action"`
}
r, _ := db.Topic().StartReader("analytics", topicoptions.ReadTopic("/local/events"))
defer r.Close(ctx)
for msg, err := range topicsugar.JSONIterator[Event](ctx, r) {
if err != nil {
log.Printf("стоп: %v", err)
break
}
process(msg.Data) // типизированный доступ
_ = r.Commit(msg.Context(), msg.Message) // коммит нижележащего *Message
}
12. Краткий справочник
12.1 Методы Client одним взглядом
| Метод | Возвращает | Блокирует? |
|---|---|---|
Create(ctx, path, opts) | error | да |
Alter(ctx, path, opts) | error | да |
Describe(ctx, path, opts) | (TopicDescription, error) | да |
DescribeTopicConsumer(ctx, path, consumer, opts) | (TopicConsumerDescription, error) | да |
CommitOffset(ctx, path, partID, consumer, offset, opts) | error | да |
Drop(ctx, path, opts) | error | да |
StartReader(consumer, sel, opts) | (*Reader, error) | нет (фоновое подключение) |
StartWriter(path, opts) | (*Writer, error) | нет |
StartTransactionalWriter(tx, path, opts) | (*TxWriter, error) | нет |
StartListener(consumer, handler, sel, opts) | (*TopicListener, error) | нет |
12.2 Каталог ошибок
| Ошибка | Где | Значение |
|---|---|---|
topicwriter.ErrQueueLimitExceed | Writer | Очередь полна, включён WithWriterErrOnQueueFull |
topicwriter.ErrMessagesPutToInternalQueueBeforeError | Writer | Часть сообщений попала в буфер до ошибки |
topicwriter.ErrInvalidConfiguration | Writer | Конфликтующие опции |
topicwriter.ErrUnimplemented | TxWriter | WaitInitInfo на multi-writer tx |
topicreader.ErrUnexpectedCodec | Reader.Message.Read | Неизвестный кодек на проводе |
topicreader.ErrConcurrencyCall* | Reader | Нарушение контракта конкурентности |
topicreader.ErrCommitToExpiredSession | Reader.Commit (sync) | Сессия переехала до ACK; не фатально |
12.3 Блок-схема выбора
Это руководство отражает состояние ydb-go-sdk/topic на ветке master в мае 2026. Экспериментальные API (транзакционные writer/reader, listener) могут эволюционировать; в случае сомнений сверяйтесь с VERSIONING.md.