Kafka producer в Go

Для Go есть несколько клиентов Kafka. В этом модуле будем использовать github.com/segmentio/kafka-go: он написан на Go, простой для чтения, хорошо подходит для учебных и production-сценариев, не требует CGO и удобно показывает базовую модель Kafka.

В больших компаниях также часто используют Sarama или Confluent Kafka Go client. Но для понимания producer logic важнее не конкретная библиотека, а решения: key, batch, compression, timeout, retries, delivery guarantees, schema evolution.

bash
go get github.com/segmentio/kafka-go

Минимальный producer

go
package main import ( "context" "encoding/json" "errors" "fmt" "log/slog" "os" "os/signal" "syscall" "time" "github.com/segmentio/kafka-go" ) type OrderCreated struct { EventID string `json:"event_id"` EventType string `json:"event_type"` Version int `json:"version"` Occurred time.Time `json:"occurred"` OrderID string `json:"order_id"` UserID string `json:"user_id"` Amount int64 `json:"amount"` Currency string `json:"currency"` } func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() writer := &kafka.Writer{ Addr: kafka.TCP("localhost:9092"), Topic: "orders.events", Balancer: &kafka.Hash{}, RequiredAcks: kafka.RequireAll, Async: false, BatchSize: 100, BatchTimeout: 20 * time.Millisecond, Compression: kafka.Snappy, } defer func() { if err := writer.Close(); err != nil { logger.Error("close kafka writer", "error", err) } }() event := OrderCreated{ EventID: "evt-1001", EventType: "OrderCreated", Version: 1, Occurred: time.Now().UTC(), OrderID: "order-42", UserID: "user-7", Amount: 9900, Currency: "RUB", } if err := publishOrderCreated(ctx, writer, event); err != nil { logger.Error("publish event", "error", err) os.Exit(1) } logger.Info("event published", "event_id", event.EventID) } func publishOrderCreated(ctx context.Context, writer *kafka.Writer, event OrderCreated) error { payload, err := json.Marshal(event) if err != nil { return fmt.Errorf("marshal order created: %w", err) } publishCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() msg := kafka.Message{ Key: []byte(event.OrderID), Value: payload, Time: event.Occurred, Headers: []kafka.Header{ {Key: "event-type", Value: []byte(event.EventType)}, {Key: "schema-version", Value: []byte("1")}, {Key: "content-type", Value: []byte("application/json")}, }, } if err := writer.WriteMessages(publishCtx, msg); err != nil { if errors.Is(err, context.DeadlineExceeded) { return fmt.Errorf("publish timeout: %w", err) } return fmt.Errorf("write kafka message: %w", err) } return nil }

Главные детали:

  • Key: []byte(event.OrderID) сохраняет порядок событий одного заказа;
  • RequiredAcks: kafka.RequireAll соответствует acks=all;
  • BatchSize и BatchTimeout дают producer шанс объединить сообщения;
  • Compression снижает network/disk cost;
  • context.WithTimeout не даёт HTTP request или job зависнуть навсегда.

Key: бизнес-решение, а не техническая мелочь

Key выбирают по вопросу: "для какой сущности мне нужен порядок?"

СобытиеЧастый keyПричина
OrderCreated, OrderPaidorder_idЖизненный цикл заказа должен быть последовательным.
UserRegistered, UserEmailChangeduser_idСостояние пользователя обновляется по порядку.
PaymentAuthorizedpayment_idПлатёжная сущность живёт отдельно.
InventoryReservedsku или reservation_idЗависит от модели консистентности склада.

Если key не задан, producer распределяет records иначе: round-robin, least-bytes или другой balancer. Это может быть нормально для независимых логов, метрик или событий без требования порядка.


Batching и latency

Producer не обязан отправлять каждое сообщение отдельным network call. Он может собрать batch.

text
small writes: msg1 -> network msg2 -> network msg3 -> network batching: [msg1 msg2 msg3 ... msg100] -> network
НастройкаЧто делает
BatchSizeМаксимальное количество messages в batch.
BatchBytesМаксимальный размер batch.
BatchTimeoutСколько ждать заполнения batch перед отправкой.
AsyncВозвращать управление до подтверждения broker.

Большие batches увеличивают throughput и эффективность compression, но добавляют latency. Для user-facing запроса обычно важнее предсказуемая задержка. Для фонового экспорта событий можно дать producer больше времени на batch.

Async=true выглядит заманчиво, но для важных доменных событий это почти всегда плохой default: caller не получает ошибку доставки и не может связать результат publish с use case. Async producer уместен для best-effort telemetry, где потеря допустима и явно описана.


Compression

Kafka хорошо дружит с compression, потому что события часто похожи друг на друга. В kafka-go доступны разные codecs, например Snappy, Gzip, Lz4, Zstd в зависимости от версии клиента.

Практически:

  • Snappy - быстрый и простой выбор;
  • Gzip - сильнее сжимает, но дороже CPU;
  • Zstd - часто хороший современный баланс, если поддерживается всей инфраструктурой.

Compression применяется к batch. Поэтому batch size влияет не только на network, но и на ratio сжатия.


Retries и timeout

Retries нужны, потому что Kafka cluster может временно отвечать ошибками: leader election, network hiccup, broker rolling restart. Но retry без timeout превращается в зависание.

go
func publishWithRetry(ctx context.Context, writer *kafka.Writer, msg kafka.Message) error { var lastErr error for attempt := 1; attempt <= 3; attempt++ { attemptCtx, cancel := context.WithTimeout(ctx, 3*time.Second) err := writer.WriteMessages(attemptCtx, msg) cancel() if err == nil { return nil } if errors.Is(ctx.Err(), context.Canceled) { return ctx.Err() } lastErr = err select { case <-ctx.Done(): return ctx.Err() case <-time.After(time.Duration(attempt) * 200 * time.Millisecond): } } return fmt.Errorf("publish after retries: %w", lastErr) }

В реальном сервисе retry policy часто выносят в общий компонент, логируют попытки и различают retriable/non-retriable ошибки. Но принцип тот же: каждый retry ограничен context, а общий operation тоже должен иметь deadline.


Idempotence

Idempotent producer защищает от дублей при retry на уровне producer session: producer может повторить запись после сетевой ошибки, а broker понимает sequence numbers и не записывает duplicate.

В Java client это стандартная production-настройка: enable.idempotence=true. В Go-клиентах поддержка зависит от библиотеки и версии. Например, высокоуровневый segmentio/kafka-go Writer удобен, но не стоит проектировать систему так, будто он автоматически даёт все гарантии Java producer transactions/idempotence. Если клиент не даёт полноценный idempotent producer или вы не включили его явно, backend всё равно должен проектировать идемпотентные consumers через event_id, уникальные ключи, upsert и таблицы обработанных событий.

Важно различать:

  • idempotent producer уменьшает дубли при отправке;
  • idempotent consumer защищает систему, когда duplicate всё равно пришёл;
  • exactly-once в Kafka не отменяет необходимость думать о БД и внешних side effects.

Headers

Headers полезны для технической информации:

go
Headers: []kafka.Header{ {Key: "event-type", Value: []byte("OrderCreated")}, {Key: "schema-version", Value: []byte("1")}, {Key: "trace-id", Value: []byte(traceID)}, {Key: "correlation-id", Value: []byte(requestID)}, }

Не стоит класть в headers большие payloads. Headers часто нужны middleware, observability и routing внутри consumer, а не бизнес-объектам.

Headers также проходят через DLQ, retry topics и иногда попадают в логи. Не храните там access tokens, raw cookies, номера карт и другие секреты. Для трассировки достаточно trace/correlation id, которые не раскрывают бизнес-данные.


Schema и versioning

Kafka не знает, что лежит в Value. Для broker это bytes. Поэтому контракт события - ответственность команды.

Минимальный JSON-подход:

go
type EventEnvelope[T any] struct { ID string `json:"id"` Type string `json:"type"` Version int `json:"version"` Occurred time.Time `json:"occurred"` Data T `json:"data"` }

Правила эволюции:

  • добавляйте поля так, чтобы старые consumers могли их игнорировать;
  • не меняйте смысл существующего поля без новой версии;
  • не удаляйте поле, пока есть consumers, которые его читают;
  • добавляйте event_type и version явно;
  • для серьёзных систем используйте Schema Registry, Avro, Protobuf или JSON Schema.

Breaking change в событии обычно дороже breaking change в HTTP API: старые records могут replay через недели, а consumers обновляются не одновременно. Поэтому contract tests должны проверять не только новый payload, но и несколько старых примеров из fixtures.


Go client pitfalls

У Go Kafka-клиентов есть практические ловушки:

  • не создавайте writer на каждый request: переиспользуйте один writer на lifecycle сервиса, иначе потеряете batching и получите лишние connections;
  • закрывайте writer при shutdown, чтобы flush pending messages успел завершиться;
  • не используйте context.Background() внутри publish-функций: deadline должен приходить от caller или orchestration layer;
  • проверяйте частичные ошибки batch write, если клиент возвращает per-message результат;
  • задавайте ClientID, чтобы broker logs, quotas и метрики показывали понятный сервис;
  • держите topic names и group IDs в конфиге с fail-fast validation, а не в случайных строках по коду.

Producer в HTTP handler

Публикация в Kafka из handler может быть частью use case, но не должна ломать transaction boundary. Если событие должно строго соответствовать записи в БД, используйте outbox pattern: сначала transaction в БД, потом отдельный publisher читает outbox и отправляет Kafka.

text
HTTP request | v DB transaction: INSERT orders INSERT outbox_events COMMIT | v background publisher -> Kafka

Прямой WriteMessages после INSERT orders может создать рассинхрон: заказ сохранён, Kafka недоступна, событие потеряно. Или наоборот: событие ушло, transaction откатилась.


Вопросы на собеседовании

  • Как producer выбирает partition?
  • Почему key - это часть доменной модели?
  • Что дают batching и compression?
  • Почему producer operation должен иметь context timeout?
  • Что такое idempotent producer?
  • Почему idempotent producer не заменяет идемпотентность consumer?
  • Зачем headers в Kafka message?
  • Как версионировать события?
  • Почему outbox pattern часто лучше прямой отправки в Kafka из transaction-sensitive use case?

Практика

  1. Напишите Go-структуру UserEmailChanged и producer-функцию, которая публикует событие с key=user_id.
  2. Добавьте в сообщение headers: event-type, schema-version, trace-id.
  3. Подберите настройки batch/compression для двух сценариев: интерактивный API и фоновый экспорт.
  4. Опишите, как вы защититесь от рассинхрона "БД записалась, Kafka не приняла событие".

Интерактивная практика

Quiz+10 XP

Зачем producer-сообщению обычно нужен key?

  • Чтобы Kafka автоматически сжимала только эти сообщения
  • Чтобы consumer group могла читать без offset
  • Чтобы связанные события попадали в одну partition и сохраняли порядок
  • Чтобы сообщение удалялось сразу после чтения
Predict+15 XP

Что выведет этот код?

go
package main import "fmt" func producerMode(batchSize int) string { if batchSize <= 1 { return "low-latency" } return "throughput" } func main() { fmt.Println(producerMode(1)) fmt.Println(producerMode(100)) }
Задача+20 XP

Реализуй PublishPattern: если событие связано с транзакционной записью в БД, выбирай outbox; если это best-effort telemetry без бизнес-транзакции — direct.