Паттерны конкурентности в Go

Это финальная статья блока по конкурентности. Горутины, каналы, sync-примитивы, контекст и планировщик — всё это инструменты. Паттерны — это проверенные способы их комбинировать для решения типовых задач. Здесь не будет новых концепций, только практика: берём то что знаем и складываем в рабочие конструкции.


Pipeline — конвейер обработки

Pipeline — последовательность стадий, где каждая стадия читает данные из входного канала, трансформирует их и отдаёт в выходной. Стадии работают конкурентно — пока одна обрабатывает текущий элемент, следующая уже работает с предыдущим:

go
// Стадия 1: генератор — преобразует слайс в канал func generate(nums []int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out } // Стадия 2: возведение в квадрат func square(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { out <- n * n } }() return out } // Стадия 3: фильтрация чётных func filterEven(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if n%2 == 0 { out <- n } } }() return out } func main() { // Собираем конвейер nums := generate([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) squared := square(nums) filtered := filterEven(squared) for result := range filtered { fmt.Println(result) // 4, 16, 36, 64, 100 } }

Каждая стадия — отдельная функция, возвращающая канал. Стадии соединяются как unix-пайпы: cat file | grep pattern | wc -l. Добавление новой стадии не ломает существующие.

Pipeline с отменой через Context

Без контекста pipeline не остановить снаружи — горутины зависнут если потребитель перестанет читать:

go
func generateCtx(ctx context.Context, nums []int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-ctx.Done(): // прерываемся если контекст отменён return } } }() return out } func squareCtx(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-ctx.Done(): return } } }() return out } func main() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() nums := generateCtx(ctx, []int{1, 2, 3, 4, 5}) squared := squareCtx(ctx, nums) for result := range squared { fmt.Println(result) } }

Правило: каждая стадия pipeline должна поддерживать отмену через context — иначе при отмене горутины зависнут, заблокированные на отправке в канал, которого никто не читает.


Fan-out / Fan-in

Fan-out — распределение работы между несколькими горутинами из одного источника. Fan-in — объединение результатов из нескольких каналов в один. Вместе они реализуют параллельную обработку с агрегацией результатов:

go
// Fan-out: запускаем N воркеров, каждый читает из одного канала func fanOut(ctx context.Context, in <-chan Job, workers int) []<-chan Result { channels := make([]<-chan Result, workers) for i := 0; i < workers; i++ { channels[i] = worker(ctx, in) } return channels } func worker(ctx context.Context, in <-chan Job) <-chan Result { out := make(chan Result) go func() { defer close(out) for job := range in { select { case <-ctx.Done(): return case out <- process(job): } } }() return out } // Fan-in: объединяем несколько каналов в один func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result { merged := make(chan Result) var wg sync.WaitGroup // Для каждого входного канала запускаем горутину-форвардер forward := func(ch <-chan Result) { defer wg.Done() for result := range ch { select { case merged <- result: case <-ctx.Done(): return } } } wg.Add(len(channels)) for _, ch := range channels { go forward(ch) } // Закрываем merged когда все форвардеры завершились go func() { wg.Wait() close(merged) }() return merged } // Использование func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() jobs := generateJobs(ctx, 1000) // Fan-out: 5 воркеров параллельно обрабатывают задачи workerChannels := fanOut(ctx, jobs, 5) // Fan-in: собираем все результаты в один канал results := fanIn(ctx, workerChannels...) for result := range results { fmt.Println(result) } }

Fan-out/fan-in эффективен когда задачи независимы и каждая занимает примерно одинаковое время. Если время сильно варьируется — лучше worker pool.


Worker Pool — пул воркеров

Worker pool — фиксированное количество горутин, обрабатывающих очередь задач. В отличие от fan-out, количество горутин не зависит от количества задач:

go
type Job struct { ID int Data string } type Result struct { JobID int Output string Err error } func workerPool( ctx context.Context, numWorkers int, jobs <-chan Job, ) <-chan Result { results := make(chan Result, numWorkers) var wg sync.WaitGroup for i := 0; i < numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for { select { case job, ok := <-jobs: if !ok { return // канал закрыт — завершаем воркер } output, err := processJob(job) results <- Result{ JobID: job.ID, Output: output, Err: err, } case <-ctx.Done(): return } } }(i) } // Закрываем results когда все воркеры завершились go func() { wg.Wait() close(results) }() return results } func main() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Создаём задачи jobs := make(chan Job, 100) go func() { defer close(jobs) for i := 0; i < 1000; i++ { jobs <- Job{ID: i, Data: fmt.Sprintf("data-%d", i)}\n } }() // Запускаем пул из 10 воркеров results := workerPool(ctx, 10, jobs) // Собираем результаты var errors []error for result := range results { if result.Err != nil { errors = append(errors, result.Err) continue } fmt.Println(result.Output) } }

Динамический worker pool

Иногда нужно регулировать количество воркеров в рантайме. Простой способ — семафор:

go
type DynamicPool struct { sem chan struct{} } func NewDynamicPool(maxWorkers int) *DynamicPool { return &DynamicPool{ sem: make(chan struct{}, maxWorkers), } } func (p *DynamicPool) Submit(ctx context.Context, fn func()) error { select { case p.sem <- struct{}{}: // захватываем слот case <-ctx.Done(): return ctx.Err() } go func() { defer func() { <-p.sem }() // освобождаем слот fn() }() return nil } // Использование pool := NewDynamicPool(10) // максимум 10 одновременных задач for i := 0; i < 1000; i++ { i := i if err := pool.Submit(ctx, func() { processItem(i) }); err != nil { break // контекст отменён } }

Semaphore — ограничение параллелизма

Семафор — универсальный инструмент для ограничения количества одновременных операций. В Go идиоматично реализуется через буферизованный канал:

go
// Ограничиваем до 5 одновременных запросов к внешнему API sem := make(chan struct{}, 5) var wg sync.WaitGroup for _, url := range urls { wg.Add(1) go func(u string) { defer wg.Done() sem <- struct{}{} // захватываем слот (блокируется если занято) defer func() { <-sem }() // освобождаем слот fetch(u) }(url) } wg.Wait()

Для production-кода лучше использовать golang.org/x/sync/semaphore — он поддерживает взятие нескольких слотов за раз и отмену через контекст:

go
import "golang.org/x/sync/semaphore" sem := semaphore.NewWeighted(5) // максимум 5 единиц for _, url := range urls { // Захватываем 1 единицу с поддержкой отмены if err := sem.Acquire(ctx, 1); err != nil { break // контекст отменён } go func(u string) { defer sem.Release(1) fetch(u) }(url) } // Ждём завершения всех горутин sem.Acquire(ctx, 5)

errgroup — горутины с обработкой ошибок

sync.WaitGroup не умеет обрабатывать ошибки из горутин. golang.org/x/sync/errgroup — его расширение с поддержкой ошибок и контекста:

go
import "golang.org/x/sync/errgroup" func fetchAll(ctx context.Context, urls []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) results := make([]string, len(urls)) for i, url := range urls { i, url := i, url // захватываем переменные g.Go(func() error { resp, err := fetchURL(ctx, url) if err != nil { return fmt.Errorf("fetch %s: %w", url, err) } results[i] = resp // безопасно: каждый пишет в свой индекс return nil }) } // Ждём все горутины. Возвращает первую ошибку если была if err := g.Wait(); err != nil { return nil, err } return results, nil }

errgroup.WithContext возвращает контекст, который автоматически отменяется при первой ошибке в любой горутине. Остальные горутины должны проверять ctx.Done() чтобы корректно завершиться.

errgroup с ограничением параллелизма

go
g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) // максимум 10 горутин одновременно for _, job := range jobs { job := job g.Go(func() error { // блокируется если уже 10 горутин работает return process(ctx, job) }) } if err := g.Wait(); err != nil { return err }

SetLimit появился в Go 1.20 — встроенный семафор в errgroup, не нужно отдельно управлять.


Timeout и Retry

Два паттерна, которые часто нужны вместе при работе с внешними сервисами:

go
// Timeout: ограничение времени на операцию func withTimeout(ctx context.Context, timeout time.Duration, fn func(ctx context.Context) error) error { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() return fn(ctx) } // Retry с экспоненциальным backoff func withRetry(ctx context.Context, attempts int, fn func() error) error { var err error for i := 0; i < attempts; i++ { if err = fn(); err == nil { return nil } // Не ретраим если контекст отменён if ctx.Err() != nil { return ctx.Err() } // Экспоненциальный backoff: 100ms, 200ms, 400ms... backoff := time.Duration(1<<uint(i)) * 100 * time.Millisecond select { case <-time.After(backoff): case <-ctx.Done(): return ctx.Err() } } return fmt.Errorf("все %d попыток исчерпаны: %w", attempts, err) } // Комбинируем func callAPI(ctx context.Context) error { return withRetry(ctx, 3, func() error { return withTimeout(ctx, 2*time.Second, func(ctx context.Context) error { return httpCall(ctx) }) }) }

Or-Done — чтение с отменой

Когда нужно читать из канала с возможностью прерывания через Done — повторяющийся select можно вынести в хелпер:

go
// orDone оборачивает чтение из канала с поддержкой отмены func orDone(ctx context.Context, ch <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for { select { case <-ctx.Done(): return case v, ok := <-ch: if !ok { return } select { case out <- v: case <-ctx.Done(): return } } } }() return out } // Использование — вместо повторяющегося select везде for val := range orDone(ctx, source) { process(val) }

Tee — разветвление канала

Иногда нужно направить данные из одного канала в два потребителя:

go
func tee(ctx context.Context, in <-chan int) (<-chan int, <-chan int) { out1 := make(chan int) out2 := make(chan int) go func() { defer close(out1) defer close(out2) for val := range orDone(ctx, in) { // Локальные копии для двух select o1, o2 := out1, out2 // Отправляем в оба канала — используем два select // чтобы не зависеть от порядка готовности for i := 0; i < 2; i++ { select { case o1 <- val: o1 = nil // уже отправили, исключаем из следующего select case o2 <- val: o2 = nil } } } }() return out1, out2 } // Использование source := generate(ctx, data) forLogger, forProcessor := tee(ctx, source) go logAll(forLogger) processAll(forProcessor)

Выбор паттерна

На практике выбор между паттернами определяется характером задачи:

ЗадачаПаттерн
Последовательная трансформация данныхPipeline
Один источник, много обработчиковFan-out
Много источников, один потребительFan-in
Очередь задач с ограниченным параллелизмомWorker Pool
Ограничить конкурентность существующего кодаSemaphore
Параллельные задачи с обработкой ошибокerrgroup
Внешние вызовы с ненадёжным сервисомTimeout + Retry

На практике паттерны комбинируются: типичный backend-сервис использует worker pool для обработки входящих запросов, errgroup для параллельных запросов к зависимостям, semaphore для ограничения нагрузки на БД, и pipeline для трансформации данных внутри обработчика.


Вопросы на собеседовании

Q: Что такое pipeline? В чём его преимущества?
A: Последовательность стадий обработки, соединённых каналами. Каждая стадия — независимая горутина, читает из входного канала и пишет в выходной. Стадии работают конкурентно — пока одна обрабатывает текущий элемент, другая уже работает со следующим. Преимущества: модульность (стадии независимы), конкурентность без явной синхронизации, естественная поддержка backpressure.

Q: Чем fan-out отличается от worker pool?
A: В fan-out каждый воркер получает свой канал — источник данных копируется или распределяется по воркерам, количество воркеров фиксировано при создании. В worker pool все воркеры читают из одного общего канала задач — естественная балансировка нагрузки, медленный воркер просто берёт меньше задач. Worker pool предпочтительнее когда время обработки задач варьируется.

Q: Как реализовать семафор в Go?
A: Через буферизованный канал: sem := make(chan struct{}, N). Захват: sem <- struct{}{}, освобождение: <-sem. Блокируется когда N слотов заняты. Для production лучше golang.org/x/sync/semaphore с поддержкой контекста и взятия нескольких единиц.

Q: Чем errgroup лучше WaitGroup?
A: errgroup добавляет обработку ошибок из горутин — Wait() возвращает первую ошибку. С errgroup.WithContext контекст автоматически отменяется при первой ошибке, сигнализируя остальным горутинам. SetLimit (Go 1.20) добавляет встроенный семафор. WaitGroup только ждёт завершения без обработки ошибок.

Q: Почему важно поддерживать контекст в каждой стадии pipeline?
A: Без контекста при отмене потребитель перестаёт читать из канала, и горутины-стадии зависают, заблокированные на отправке — утечка горутин. С ctx.Done() в каждом select горутины корректно завершаются при отмене контекста.

Q: Как реализовать retry с экспоненциальным backoff?
A: Цикл на N попыток, между попытками — time.After(backoff) в select с ctx.Done(). Backoff удваивается каждую итерацию: time.Duration(1<<uint(i)) * baseDelay. Обязательно проверять ctx.Err() чтобы не ретраить при отменённом контексте.

Q: Как безопасно писать результаты из нескольких горутин в один слайс?
A: Если каждая горутина пишет в свой индекс — синхронизация не нужна, горутины не конкурируют за одну ячейку. Именно так работает errgroup с results[i] = respi уникален для каждой горутины. Если индексы пересекаются — нужен мьютекс или канал результатов.

Q: Что такое backpressure и как pipeline его обеспечивает?
A: Backpressure — механизм при котором быстрый производитель замедляется если потребитель не успевает. В pipeline это происходит автоматически: если стадия N медленная, её входной канал заполнен, предыдущая стадия блокируется на отправке и тоже замедляется. Цепочка блокировок распространяется до источника.


Задачи: Паттерны конкурентности


Задача 1: Merge каналов

Уровень: Лёгкая

Что проверяет: fan-in, работа с несколькими каналами

Условие: Напиши функцию merge(channels ...<-chan int) <-chan int которая объединяет несколько каналов в один. Результирующий канал закрывается когда все входные каналы закрыты.

Решение:

go
package main import ( "fmt" "sync" ) func merge(channels ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup forward := func(ch <-chan int) { defer wg.Done() for v := range ch { out <- v } } wg.Add(len(channels)) for _, ch := range channels { go forward(ch) } go func() { wg.Wait() close(out) }() return out } func makeChannel(vals ...int) <-chan int { ch := make(chan int) go func() { defer close(ch) for _, v := range vals { ch <- v } }() return ch } func main() { ch1 := makeChannel(1, 2, 3) ch2 := makeChannel(4, 5, 6) ch3 := makeChannel(7, 8, 9) for v := range merge(ch1, ch2, ch3) { fmt.Print(v, " ") // порядок непредсказуем, но все числа будут\n } }

Задача 2: Retry с errgroup

Уровень: Средняя

Что проверяет: errgroup, обработка ошибок из нескольких горутин

Условие: Напиши функцию fetchAll(ctx context.Context, urls []string) ([]string, error) которая конкурентно загружает все URL (имитируй через time.Sleep + возврат строки), но не более 3 одновременно. При ошибке хотя бы одного — отменяет остальные и возвращает ошибку.

Решение:

go
package main import ( "context" "fmt" "time" "golang.org/x/sync/errgroup" ) // Имитация загрузки URL func fetchURL(ctx context.Context, url string) (string, error) { select { case <-time.After(100 * time.Millisecond): return "content of " + url, nil case <-ctx.Done(): return "", ctx.Err() } } func fetchAll(ctx context.Context, urls []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) g.SetLimit(3) // не более 3 одновременно results := make([]string, len(urls)) for i, url := range urls { i, url := i, url g.Go(func() error { content, err := fetchURL(ctx, url) if err != nil { return fmt.Errorf("fetch %s: %w", url, err) } results[i] = content return nil }) } if err := g.Wait(); err != nil { return nil, err } return results, nil } func main() { ctx := context.Background() urls := []string{ "https://example.com/1", "https://example.com/2", "https://example.com/3", "https://example.com/4", "https://example.com/5", } results, err := fetchAll(ctx, urls) if err != nil { fmt.Println("error:", err)\n return } for _, r := range results { fmt.Println(r) } }

Задача 3: Rate limiter

Уровень: Сложная

Что проверяет: комбинирование паттернов, ticker, семафор

Условие: Реализуй RateLimiter который позволяет не более N запросов в секунду. Метод Allow(ctx context.Context) error блокируется если лимит исчерпан, и возвращает nil когда можно выполнить запрос. Если контекст отменён — возвращает ошибку.

Подсказка: Используй буферизованный канал как токен-bucket. Ticker пополняет токены с нужной частотой.

Решение:

go
package main import ( "context" "fmt" "time" ) type RateLimiter struct { tokens chan struct{} } func NewRateLimiter(rps int) *RateLimiter { rl := &RateLimiter{ tokens: make(chan struct{}, rps), } // Заполняем начальные токены for i := 0; i < rps; i++ { rl.tokens <- struct{}{} } // Пополняем токены с частотой rps/сек go func() { ticker := time.NewTicker(time.Second / time.Duration(rps)) defer ticker.Stop() for range ticker.C { select { case rl.tokens <- struct{}{}: // добавляем токен если есть место default: // буфер полон — пропускаем } } }() return rl } func (rl *RateLimiter) Allow(ctx context.Context) error { select { case <-rl.tokens: // берём токен return nil case <-ctx.Done(): return ctx.Err() } } func main() { limiter := NewRateLimiter(3) // 3 запроса в секунду ctx := context.Background() for i := 0; i < 9; i++ { if err := limiter.Allow(ctx); err != nil { fmt.Println("cancelled:", err)\n return } fmt.Printf("запрос %d выполнен в %s\n", i+1, time.Now().Format("15:04:05.000")) } } // Первые 3 выполняются сразу (начальные токены). // Следующие — по 3 в секунду.

Викторина: какой паттерн выбрать?

Для каждой ситуации выбери паттерн и объясни почему именно он.


Ситуация 1

Нужно скачать 1000 изображений с внешнего сервиса. Сервис не выдерживает больше 10 одновременных запросов.

Ответ: Worker Pool или Semaphore.

text
Semaphore: ограничиваем конкурентность до 10. Запускаем горутину на каждое изображение, но одновременно работают не более 10. Проще реализовать если не нужна очередь и переиспользование воркеров. Worker Pool: если изображений постоянный поток (стрим), а не фиксированный список — pool эффективнее: воркеры живут долго и берут задачи из общей очереди.

Ситуация 2

Нужно читать строки из файла, парсить каждую как JSON, валидировать и сохранять в базу данных. Чтение быстрое, парсинг медленный, запись в БД ещё медленнее.

Ответ: Pipeline.

text
Три стадии с разной скоростью — классический Pipeline. Стадия 1: читаем строки → канал. Стадия 2: парсим JSON конкурентно (fan-out на несколько парсеров). Стадия 3: пишем в БД (с ограничением через semaphore). Каждая стадия работает независимо, backpressure возникает автоматически через каналы.

Ситуация 3

Сервис обращается к трём источникам данных (Redis, PostgreSQL, внешний API) и возвращает агрегированный результат. Все три запроса независимы.

Ответ: errgroup (Fan-out + сбор результатов).

text
Три независимых запроса — запускаем параллельно через errgroup. При ошибке любого — контекст отменяется, остальные прерываются. Wait() возвращает первую ошибку. Результаты пишем в индексированный слайс — без мьютекса, каждая горутина пишет в свой индекс.

Ситуация 4

Нужно отправить уведомление через три канала (email, SMS, push). Достаточно чтобы хотя бы один канал сработал успешно — остальные можно отменить.

Ответ: Fan-out + first-result-wins через контекст.

text
Запускаем три горутины параллельно. Первая успешная пишет в буферизованный канал. Select читает первый результат и вызывает cancel() — остальные горутины получают ctx.Done() и завершаются. Паттерн "fastest" из задачи на Context.

Ситуация 5

Нужно агрегировать метрики из 50 микросервисов каждые 30 секунд. Каждый опрос занимает до 2 секунд.

Ответ: Fan-out + Fan-in с таймаутом.

text
Fan-out: запускаем 50 горутин параллельно. Fan-in: собираем результаты в один канал. Context с таймаутом 2 секунды — медленные сервисы не задерживают агрегацию. Частичный результат (только те кто успел) лучше чем ждать всех.

Ситуация 6

Нужно обрабатывать события из Kafka. Каждое событие независимо. Хочется максимальной пропускной способности но не потерять события при завершении.

Ответ: Worker Pool с graceful shutdown.

text
Worker Pool: фиксированное количество воркеров читают из общей очереди (канала куда пишет Kafka-консьюмер). При завершении: закрываем канал задач, воркеры дочитывают оставшиеся события и завершаются. WaitGroup ждёт всех воркеров перед остановкой процесса.

Ситуация 7

Нужно логировать каждый запрос к API но запись в файл медленная и не должна блокировать обработчик.

Ответ: Буферизованный канал как асинхронная очередь.

text
Обработчик пишет в буферизованный канал (неблокирующий для большинства случаев). Отдельная горутина-воркер читает из канала и пишет в файл. Если канал полон (буфер переполнен) — можно дропать логи или блокироваться в зависимости от требований.

Ситуация 8

Нужно реализовать поиск по нескольким источникам (ElasticSearch, PostgreSQL full-text, внешний API). Возвращать объединённые результаты всех источников.

Ответ: Fan-out + Fan-in.

text
Fan-out: три горутины параллельно делают поиск. Fan-in: merge результатов из трёх каналов в один. Дедупликация результатов уже в основной горутине. errgroup для обработки ошибок — если один источник упал, всё равно возвращаем результаты от остальных (игнорируем ошибку или логируем).