Kafka producer в Go
Для Go есть несколько клиентов Kafka. В этом модуле будем использовать github.com/segmentio/kafka-go: он написан на Go, простой для чтения, хорошо подходит для учебных и production-сценариев, не требует CGO и удобно показывает базовую модель Kafka.
В больших компаниях также часто используют Sarama или Confluent Kafka Go client. Но для понимания producer logic важнее не конкретная библиотека, а решения: key, batch, compression, timeout, retries, delivery guarantees, schema evolution.
go get github.com/segmentio/kafka-go
Минимальный producer
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/segmentio/kafka-go"
)
type OrderCreated struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"`
Version int `json:"version"`
Occurred time.Time `json:"occurred"`
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Amount int64 `json:"amount"`
Currency string `json:"currency"`
}
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "orders.events",
Balancer: &kafka.Hash{},
RequiredAcks: kafka.RequireAll,
Async: false,
BatchSize: 100,
BatchTimeout: 20 * time.Millisecond,
Compression: kafka.Snappy,
}
defer func() {
if err := writer.Close(); err != nil {
logger.Error("close kafka writer", "error", err)
}
}()
event := OrderCreated{
EventID: "evt-1001",
EventType: "OrderCreated",
Version: 1,
Occurred: time.Now().UTC(),
OrderID: "order-42",
UserID: "user-7",
Amount: 9900,
Currency: "RUB",
}
if err := publishOrderCreated(ctx, writer, event); err != nil {
logger.Error("publish event", "error", err)
os.Exit(1)
}
logger.Info("event published", "event_id", event.EventID)
}
func publishOrderCreated(ctx context.Context, writer *kafka.Writer, event OrderCreated) error {
payload, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal order created: %w", err)
}
publishCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
msg := kafka.Message{
Key: []byte(event.OrderID),
Value: payload,
Time: event.Occurred,
Headers: []kafka.Header{
{Key: "event-type", Value: []byte(event.EventType)},
{Key: "schema-version", Value: []byte("1")},
{Key: "content-type", Value: []byte("application/json")},
},
}
if err := writer.WriteMessages(publishCtx, msg); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("publish timeout: %w", err)
}
return fmt.Errorf("write kafka message: %w", err)
}
return nil
}
Главные детали:
Key: []byte(event.OrderID)сохраняет порядок событий одного заказа;RequiredAcks: kafka.RequireAllсоответствуетacks=all;BatchSizeиBatchTimeoutдают producer шанс объединить сообщения;Compressionснижает network/disk cost;context.WithTimeoutне даёт HTTP request или job зависнуть навсегда.
Key: бизнес-решение, а не техническая мелочь
Key выбирают по вопросу: "для какой сущности мне нужен порядок?"
| Событие | Частый key | Причина |
|---|---|---|
OrderCreated, OrderPaid | order_id | Жизненный цикл заказа должен быть последовательным. |
UserRegistered, UserEmailChanged | user_id | Состояние пользователя обновляется по порядку. |
PaymentAuthorized | payment_id | Платёжная сущность живёт отдельно. |
InventoryReserved | sku или reservation_id | Зависит от модели консистентности склада. |
Если key не задан, producer распределяет records иначе: round-robin, least-bytes или другой balancer. Это может быть нормально для независимых логов, метрик или событий без требования порядка.
Batching и latency
Producer не обязан отправлять каждое сообщение отдельным network call. Он может собрать batch.
small writes:
msg1 -> network
msg2 -> network
msg3 -> network
batching:
[msg1 msg2 msg3 ... msg100] -> network
| Настройка | Что делает |
|---|---|
BatchSize | Максимальное количество messages в batch. |
BatchBytes | Максимальный размер batch. |
BatchTimeout | Сколько ждать заполнения batch перед отправкой. |
Async | Возвращать управление до подтверждения broker. |
Большие batches увеличивают throughput и эффективность compression, но добавляют latency. Для user-facing запроса обычно важнее предсказуемая задержка. Для фонового экспорта событий можно дать producer больше времени на batch.
Async=true выглядит заманчиво, но для важных доменных событий это почти всегда плохой default: caller не получает ошибку доставки и не может связать результат publish с use case. Async producer уместен для best-effort telemetry, где потеря допустима и явно описана.
Compression
Kafka хорошо дружит с compression, потому что события часто похожи друг на друга. В kafka-go доступны разные codecs, например Snappy, Gzip, Lz4, Zstd в зависимости от версии клиента.
Практически:
- Snappy - быстрый и простой выбор;
- Gzip - сильнее сжимает, но дороже CPU;
- Zstd - часто хороший современный баланс, если поддерживается всей инфраструктурой.
Compression применяется к batch. Поэтому batch size влияет не только на network, но и на ratio сжатия.
Retries и timeout
Retries нужны, потому что Kafka cluster может временно отвечать ошибками: leader election, network hiccup, broker rolling restart. Но retry без timeout превращается в зависание.
func publishWithRetry(ctx context.Context, writer *kafka.Writer, msg kafka.Message) error {
var lastErr error
for attempt := 1; attempt <= 3; attempt++ {
attemptCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
err := writer.WriteMessages(attemptCtx, msg)
cancel()
if err == nil {
return nil
}
if errors.Is(ctx.Err(), context.Canceled) {
return ctx.Err()
}
lastErr = err
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Duration(attempt) * 200 * time.Millisecond):
}
}
return fmt.Errorf("publish after retries: %w", lastErr)
}
В реальном сервисе retry policy часто выносят в общий компонент, логируют попытки и различают retriable/non-retriable ошибки. Но принцип тот же: каждый retry ограничен context, а общий operation тоже должен иметь deadline.
Idempotence
Idempotent producer защищает от дублей при retry на уровне producer session: producer может повторить запись после сетевой ошибки, а broker понимает sequence numbers и не записывает duplicate.
В Java client это стандартная production-настройка: enable.idempotence=true. В Go-клиентах поддержка зависит от библиотеки и версии. Например, высокоуровневый segmentio/kafka-go Writer удобен, но не стоит проектировать систему так, будто он автоматически даёт все гарантии Java producer transactions/idempotence. Если клиент не даёт полноценный idempotent producer или вы не включили его явно, backend всё равно должен проектировать идемпотентные consumers через event_id, уникальные ключи, upsert и таблицы обработанных событий.
Важно различать:
- idempotent producer уменьшает дубли при отправке;
- idempotent consumer защищает систему, когда duplicate всё равно пришёл;
- exactly-once в Kafka не отменяет необходимость думать о БД и внешних side effects.
Headers
Headers полезны для технической информации:
Headers: []kafka.Header{
{Key: "event-type", Value: []byte("OrderCreated")},
{Key: "schema-version", Value: []byte("1")},
{Key: "trace-id", Value: []byte(traceID)},
{Key: "correlation-id", Value: []byte(requestID)},
}
Не стоит класть в headers большие payloads. Headers часто нужны middleware, observability и routing внутри consumer, а не бизнес-объектам.
Headers также проходят через DLQ, retry topics и иногда попадают в логи. Не храните там access tokens, raw cookies, номера карт и другие секреты. Для трассировки достаточно trace/correlation id, которые не раскрывают бизнес-данные.
Schema и versioning
Kafka не знает, что лежит в Value. Для broker это bytes. Поэтому контракт события - ответственность команды.
Минимальный JSON-подход:
type EventEnvelope[T any] struct {
ID string `json:"id"`
Type string `json:"type"`
Version int `json:"version"`
Occurred time.Time `json:"occurred"`
Data T `json:"data"`
}
Правила эволюции:
- добавляйте поля так, чтобы старые consumers могли их игнорировать;
- не меняйте смысл существующего поля без новой версии;
- не удаляйте поле, пока есть consumers, которые его читают;
- добавляйте
event_typeиversionявно; - для серьёзных систем используйте Schema Registry, Avro, Protobuf или JSON Schema.
Breaking change в событии обычно дороже breaking change в HTTP API: старые records могут replay через недели, а consumers обновляются не одновременно. Поэтому contract tests должны проверять не только новый payload, но и несколько старых примеров из fixtures.
Go client pitfalls
У Go Kafka-клиентов есть практические ловушки:
- не создавайте writer на каждый request: переиспользуйте один writer на lifecycle сервиса, иначе потеряете batching и получите лишние connections;
- закрывайте writer при shutdown, чтобы flush pending messages успел завершиться;
- не используйте
context.Background()внутри publish-функций: deadline должен приходить от caller или orchestration layer; - проверяйте частичные ошибки batch write, если клиент возвращает per-message результат;
- задавайте
ClientID, чтобы broker logs, quotas и метрики показывали понятный сервис; - держите topic names и group IDs в конфиге с fail-fast validation, а не в случайных строках по коду.
Producer в HTTP handler
Публикация в Kafka из handler может быть частью use case, но не должна ломать transaction boundary. Если событие должно строго соответствовать записи в БД, используйте outbox pattern: сначала transaction в БД, потом отдельный publisher читает outbox и отправляет Kafka.
HTTP request
|
v
DB transaction:
INSERT orders
INSERT outbox_events
COMMIT
|
v
background publisher -> Kafka
Прямой WriteMessages после INSERT orders может создать рассинхрон: заказ сохранён, Kafka недоступна, событие потеряно. Или наоборот: событие ушло, transaction откатилась.
Вопросы на собеседовании
- Как producer выбирает partition?
- Почему key - это часть доменной модели?
- Что дают batching и compression?
- Почему producer operation должен иметь context timeout?
- Что такое idempotent producer?
- Почему idempotent producer не заменяет идемпотентность consumer?
- Зачем headers в Kafka message?
- Как версионировать события?
- Почему outbox pattern часто лучше прямой отправки в Kafka из transaction-sensitive use case?
Практика
- Напишите Go-структуру
UserEmailChangedи producer-функцию, которая публикует событие с key=user_id. - Добавьте в сообщение headers:
event-type,schema-version,trace-id. - Подберите настройки batch/compression для двух сценариев: интерактивный API и фоновый экспорт.
- Опишите, как вы защититесь от рассинхрона "БД записалась, Kafka не приняла событие".
Интерактивная практика
Зачем producer-сообщению обычно нужен key?
Что выведет этот код?
package main
import "fmt"
func producerMode(batchSize int) string {
if batchSize <= 1 {
return "low-latency"
}
return "throughput"
}
func main() {
fmt.Println(producerMode(1))
fmt.Println(producerMode(100))
}
Реализуй PublishPattern: если событие связано с транзакционной записью в БД, выбирай outbox; если это best-effort telemetry без бизнес-транзакции — direct.