Паттерны конкурентности в Go
Это финальная статья блока по конкурентности. Горутины, каналы, sync-примитивы, контекст и планировщик — всё это инструменты. Паттерны — это проверенные способы их комбинировать для решения типовых задач. Здесь не будет новых концепций, только практика: берём то что знаем и складываем в рабочие конструкции.
Pipeline — конвейер обработки
Pipeline — последовательность стадий, где каждая стадия читает данные из входного канала, трансформирует их и отдаёт в выходной. Стадии работают конкурентно — пока одна обрабатывает текущий элемент, следующая уже работает с предыдущим:
// Стадия 1: генератор — преобразует слайс в канал
func generate(nums []int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Стадия 2: возведение в квадрат
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// Стадия 3: фильтрация чётных
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
func main() {
// Собираем конвейер
nums := generate([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
squared := square(nums)
filtered := filterEven(squared)
for result := range filtered {
fmt.Println(result) // 4, 16, 36, 64, 100
}
}
Каждая стадия — отдельная функция, возвращающая канал. Стадии соединяются как unix-пайпы: cat file | grep pattern | wc -l. Добавление новой стадии не ломает существующие.
Pipeline с отменой через Context
Без контекста pipeline не остановить снаружи — горутины зависнут если потребитель перестанет читать:
func generateCtx(ctx context.Context, nums []int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done(): // прерываемся если контекст отменён
return
}
}
}()
return out
}
func squareCtx(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
nums := generateCtx(ctx, []int{1, 2, 3, 4, 5})
squared := squareCtx(ctx, nums)
for result := range squared {
fmt.Println(result)
}
}
Правило: каждая стадия pipeline должна поддерживать отмену через context — иначе при отмене горутины зависнут, заблокированные на отправке в канал, которого никто не читает.
Fan-out / Fan-in
Fan-out — распределение работы между несколькими горутинами из одного источника. Fan-in — объединение результатов из нескольких каналов в один. Вместе они реализуют параллельную обработку с агрегацией результатов:
// Fan-out: запускаем N воркеров, каждый читает из одного канала
func fanOut(ctx context.Context, in <-chan Job, workers int) []<-chan Result {
channels := make([]<-chan Result, workers)
for i := 0; i < workers; i++ {
channels[i] = worker(ctx, in)
}
return channels
}
func worker(ctx context.Context, in <-chan Job) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for job := range in {
select {
case <-ctx.Done():
return
case out <- process(job):
}
}
}()
return out
}
// Fan-in: объединяем несколько каналов в один
func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {
merged := make(chan Result)
var wg sync.WaitGroup
// Для каждого входного канала запускаем горутину-форвардер
forward := func(ch <-chan Result) {
defer wg.Done()
for result := range ch {
select {
case merged <- result:
case <-ctx.Done():
return
}
}
}
wg.Add(len(channels))
for _, ch := range channels {
go forward(ch)
}
// Закрываем merged когда все форвардеры завершились
go func() {
wg.Wait()
close(merged)
}()
return merged
}
// Использование
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
jobs := generateJobs(ctx, 1000)
// Fan-out: 5 воркеров параллельно обрабатывают задачи
workerChannels := fanOut(ctx, jobs, 5)
// Fan-in: собираем все результаты в один канал
results := fanIn(ctx, workerChannels...)
for result := range results {
fmt.Println(result)
}
}
Fan-out/fan-in эффективен когда задачи независимы и каждая занимает примерно одинаковое время. Если время сильно варьируется — лучше worker pool.
Worker Pool — пул воркеров
Worker pool — фиксированное количество горутин, обрабатывающих очередь задач. В отличие от fan-out, количество горутин не зависит от количества задач:
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
Err error
}
func workerPool(
ctx context.Context,
numWorkers int,
jobs <-chan Job,
) <-chan Result {
results := make(chan Result, numWorkers)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
return // канал закрыт — завершаем воркер
}
output, err := processJob(job)
results <- Result{
JobID: job.ID,
Output: output,
Err: err,
}
case <-ctx.Done():
return
}
}
}(i)
}
// Закрываем results когда все воркеры завершились
go func() {
wg.Wait()
close(results)
}()
return results
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Создаём задачи
jobs := make(chan Job, 100)
go func() {
defer close(jobs)
for i := 0; i < 1000; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("data-%d", i)}\n }
}()
// Запускаем пул из 10 воркеров
results := workerPool(ctx, 10, jobs)
// Собираем результаты
var errors []error
for result := range results {
if result.Err != nil {
errors = append(errors, result.Err)
continue
}
fmt.Println(result.Output)
}
}
Динамический worker pool
Иногда нужно регулировать количество воркеров в рантайме. Простой способ — семафор:
type DynamicPool struct {
sem chan struct{}
}
func NewDynamicPool(maxWorkers int) *DynamicPool {
return &DynamicPool{
sem: make(chan struct{}, maxWorkers),
}
}
func (p *DynamicPool) Submit(ctx context.Context, fn func()) error {
select {
case p.sem <- struct{}{}: // захватываем слот
case <-ctx.Done():
return ctx.Err()
}
go func() {
defer func() { <-p.sem }() // освобождаем слот
fn()
}()
return nil
}
// Использование
pool := NewDynamicPool(10) // максимум 10 одновременных задач
for i := 0; i < 1000; i++ {
i := i
if err := pool.Submit(ctx, func() {
processItem(i)
}); err != nil {
break // контекст отменён
}
}
Semaphore — ограничение параллелизма
Семафор — универсальный инструмент для ограничения количества одновременных операций. В Go идиоматично реализуется через буферизованный канал:
// Ограничиваем до 5 одновременных запросов к внешнему API
sem := make(chan struct{}, 5)
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
sem <- struct{}{} // захватываем слот (блокируется если занято)
defer func() { <-sem }() // освобождаем слот
fetch(u)
}(url)
}
wg.Wait()
Для production-кода лучше использовать golang.org/x/sync/semaphore — он поддерживает взятие нескольких слотов за раз и отмену через контекст:
import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(5) // максимум 5 единиц
for _, url := range urls {
// Захватываем 1 единицу с поддержкой отмены
if err := sem.Acquire(ctx, 1); err != nil {
break // контекст отменён
}
go func(u string) {
defer sem.Release(1)
fetch(u)
}(url)
}
// Ждём завершения всех горутин
sem.Acquire(ctx, 5)
errgroup — горутины с обработкой ошибок
sync.WaitGroup не умеет обрабатывать ошибки из горутин. golang.org/x/sync/errgroup — его расширение с поддержкой ошибок и контекста:
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url // захватываем переменные
g.Go(func() error {
resp, err := fetchURL(ctx, url)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
results[i] = resp // безопасно: каждый пишет в свой индекс
return nil
})
}
// Ждём все горутины. Возвращает первую ошибку если была
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
errgroup.WithContext возвращает контекст, который автоматически отменяется при первой ошибке в любой горутине. Остальные горутины должны проверять ctx.Done() чтобы корректно завершиться.
errgroup с ограничением параллелизма
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // максимум 10 горутин одновременно
for _, job := range jobs {
job := job
g.Go(func() error { // блокируется если уже 10 горутин работает
return process(ctx, job)
})
}
if err := g.Wait(); err != nil {
return err
}
SetLimit появился в Go 1.20 — встроенный семафор в errgroup, не нужно отдельно управлять.
Timeout и Retry
Два паттерна, которые часто нужны вместе при работе с внешними сервисами:
// Timeout: ограничение времени на операцию
func withTimeout(ctx context.Context, timeout time.Duration, fn func(ctx context.Context) error) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return fn(ctx)
}
// Retry с экспоненциальным backoff
func withRetry(ctx context.Context, attempts int, fn func() error) error {
var err error
for i := 0; i < attempts; i++ {
if err = fn(); err == nil {
return nil
}
// Не ретраим если контекст отменён
if ctx.Err() != nil {
return ctx.Err()
}
// Экспоненциальный backoff: 100ms, 200ms, 400ms...
backoff := time.Duration(1<<uint(i)) * 100 * time.Millisecond
select {
case <-time.After(backoff):
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("все %d попыток исчерпаны: %w", attempts, err)
}
// Комбинируем
func callAPI(ctx context.Context) error {
return withRetry(ctx, 3, func() error {
return withTimeout(ctx, 2*time.Second, func(ctx context.Context) error {
return httpCall(ctx)
})
})
}
Or-Done — чтение с отменой
Когда нужно читать из канала с возможностью прерывания через Done — повторяющийся select можно вынести в хелпер:
// orDone оборачивает чтение из канала с поддержкой отмены
func orDone(ctx context.Context, ch <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-ch:
if !ok {
return
}
select {
case out <- v:
case <-ctx.Done():
return
}
}
}
}()
return out
}
// Использование — вместо повторяющегося select везде
for val := range orDone(ctx, source) {
process(val)
}
Tee — разветвление канала
Иногда нужно направить данные из одного канала в два потребителя:
func tee(ctx context.Context, in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(ctx, in) {
// Локальные копии для двух select
o1, o2 := out1, out2
// Отправляем в оба канала — используем два select
// чтобы не зависеть от порядка готовности
for i := 0; i < 2; i++ {
select {
case o1 <- val:
o1 = nil // уже отправили, исключаем из следующего select
case o2 <- val:
o2 = nil
}
}
}
}()
return out1, out2
}
// Использование
source := generate(ctx, data)
forLogger, forProcessor := tee(ctx, source)
go logAll(forLogger)
processAll(forProcessor)
Выбор паттерна
На практике выбор между паттернами определяется характером задачи:
| Задача | Паттерн |
|---|---|
| Последовательная трансформация данных | Pipeline |
| Один источник, много обработчиков | Fan-out |
| Много источников, один потребитель | Fan-in |
| Очередь задач с ограниченным параллелизмом | Worker Pool |
| Ограничить конкурентность существующего кода | Semaphore |
| Параллельные задачи с обработкой ошибок | errgroup |
| Внешние вызовы с ненадёжным сервисом | Timeout + Retry |
На практике паттерны комбинируются: типичный backend-сервис использует worker pool для обработки входящих запросов, errgroup для параллельных запросов к зависимостям, semaphore для ограничения нагрузки на БД, и pipeline для трансформации данных внутри обработчика.
Вопросы на собеседовании
Q: Что такое pipeline? В чём его преимущества?
A: Последовательность стадий обработки, соединённых каналами. Каждая стадия — независимая горутина, читает из входного канала и пишет в выходной. Стадии работают конкурентно — пока одна обрабатывает текущий элемент, другая уже работает со следующим. Преимущества: модульность (стадии независимы), конкурентность без явной синхронизации, естественная поддержка backpressure.
Q: Чем fan-out отличается от worker pool?
A: В fan-out каждый воркер получает свой канал — источник данных копируется или распределяется по воркерам, количество воркеров фиксировано при создании. В worker pool все воркеры читают из одного общего канала задач — естественная балансировка нагрузки, медленный воркер просто берёт меньше задач. Worker pool предпочтительнее когда время обработки задач варьируется.
Q: Как реализовать семафор в Go?
A: Через буферизованный канал: sem := make(chan struct{}, N). Захват: sem <- struct{}{}, освобождение: <-sem. Блокируется когда N слотов заняты. Для production лучше golang.org/x/sync/semaphore с поддержкой контекста и взятия нескольких единиц.
Q: Чем errgroup лучше WaitGroup?
A: errgroup добавляет обработку ошибок из горутин — Wait() возвращает первую ошибку. С errgroup.WithContext контекст автоматически отменяется при первой ошибке, сигнализируя остальным горутинам. SetLimit (Go 1.20) добавляет встроенный семафор. WaitGroup только ждёт завершения без обработки ошибок.
Q: Почему важно поддерживать контекст в каждой стадии pipeline?
A: Без контекста при отмене потребитель перестаёт читать из канала, и горутины-стадии зависают, заблокированные на отправке — утечка горутин. С ctx.Done() в каждом select горутины корректно завершаются при отмене контекста.
Q: Как реализовать retry с экспоненциальным backoff?
A: Цикл на N попыток, между попытками — time.After(backoff) в select с ctx.Done(). Backoff удваивается каждую итерацию: time.Duration(1<<uint(i)) * baseDelay. Обязательно проверять ctx.Err() чтобы не ретраить при отменённом контексте.
Q: Как безопасно писать результаты из нескольких горутин в один слайс?
A: Если каждая горутина пишет в свой индекс — синхронизация не нужна, горутины не конкурируют за одну ячейку. Именно так работает errgroup с results[i] = resp — i уникален для каждой горутины. Если индексы пересекаются — нужен мьютекс или канал результатов.
Q: Что такое backpressure и как pipeline его обеспечивает?
A: Backpressure — механизм при котором быстрый производитель замедляется если потребитель не успевает. В pipeline это происходит автоматически: если стадия N медленная, её входной канал заполнен, предыдущая стадия блокируется на отправке и тоже замедляется. Цепочка блокировок распространяется до источника.
Задачи: Паттерны конкурентности
Задача 1: Merge каналов
Уровень: Лёгкая
Что проверяет: fan-in, работа с несколькими каналами
Условие: Напиши функцию merge(channels ...<-chan int) <-chan int которая объединяет несколько каналов в один. Результирующий канал закрывается когда все входные каналы закрыты.
Решение:
package main
import (
"fmt"
"sync"
)
func merge(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
forward := func(ch <-chan int) {
defer wg.Done()
for v := range ch {
out <- v
}
}
wg.Add(len(channels))
for _, ch := range channels {
go forward(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func makeChannel(vals ...int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for _, v := range vals {
ch <- v
}
}()
return ch
}
func main() {
ch1 := makeChannel(1, 2, 3)
ch2 := makeChannel(4, 5, 6)
ch3 := makeChannel(7, 8, 9)
for v := range merge(ch1, ch2, ch3) {
fmt.Print(v, " ") // порядок непредсказуем, но все числа будут\n }
}
Задача 2: Retry с errgroup
Уровень: Средняя
Что проверяет: errgroup, обработка ошибок из нескольких горутин
Условие: Напиши функцию fetchAll(ctx context.Context, urls []string) ([]string, error) которая конкурентно загружает все URL (имитируй через time.Sleep + возврат строки), но не более 3 одновременно. При ошибке хотя бы одного — отменяет остальные и возвращает ошибку.
Решение:
package main
import (
"context"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
// Имитация загрузки URL
func fetchURL(ctx context.Context, url string) (string, error) {
select {
case <-time.After(100 * time.Millisecond):
return "content of " + url, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func fetchAll(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(3) // не более 3 одновременно
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
content, err := fetchURL(ctx, url)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
results[i] = content
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
func main() {
ctx := context.Background()
urls := []string{
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
"https://example.com/4",
"https://example.com/5",
}
results, err := fetchAll(ctx, urls)
if err != nil {
fmt.Println("error:", err)\n return
}
for _, r := range results {
fmt.Println(r)
}
}
Задача 3: Rate limiter
Уровень: Сложная
Что проверяет: комбинирование паттернов, ticker, семафор
Условие: Реализуй RateLimiter который позволяет не более N запросов в секунду. Метод Allow(ctx context.Context) error блокируется если лимит исчерпан, и возвращает nil когда можно выполнить запрос. Если контекст отменён — возвращает ошибку.
Подсказка: Используй буферизованный канал как токен-bucket. Ticker пополняет токены с нужной частотой.
Решение:
package main
import (
"context"
"fmt"
"time"
)
type RateLimiter struct {
tokens chan struct{}
}
func NewRateLimiter(rps int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, rps),
}
// Заполняем начальные токены
for i := 0; i < rps; i++ {
rl.tokens <- struct{}{}
}
// Пополняем токены с частотой rps/сек
go func() {
ticker := time.NewTicker(time.Second / time.Duration(rps))
defer ticker.Stop()
for range ticker.C {
select {
case rl.tokens <- struct{}{}: // добавляем токен если есть место
default: // буфер полон — пропускаем
}
}
}()
return rl
}
func (rl *RateLimiter) Allow(ctx context.Context) error {
select {
case <-rl.tokens: // берём токен
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
limiter := NewRateLimiter(3) // 3 запроса в секунду
ctx := context.Background()
for i := 0; i < 9; i++ {
if err := limiter.Allow(ctx); err != nil {
fmt.Println("cancelled:", err)\n return
}
fmt.Printf("запрос %d выполнен в %s\n", i+1,
time.Now().Format("15:04:05.000"))
}
}
// Первые 3 выполняются сразу (начальные токены).
// Следующие — по 3 в секунду.
Викторина: какой паттерн выбрать?
Для каждой ситуации выбери паттерн и объясни почему именно он.
Ситуация 1
Нужно скачать 1000 изображений с внешнего сервиса. Сервис не выдерживает больше 10 одновременных запросов.
Ответ: Worker Pool или Semaphore.
Semaphore: ограничиваем конкурентность до 10.
Запускаем горутину на каждое изображение, но одновременно
работают не более 10. Проще реализовать если не нужна
очередь и переиспользование воркеров.
Worker Pool: если изображений постоянный поток (стрим),
а не фиксированный список — pool эффективнее: воркеры
живут долго и берут задачи из общей очереди.
Ситуация 2
Нужно читать строки из файла, парсить каждую как JSON, валидировать и сохранять в базу данных. Чтение быстрое, парсинг медленный, запись в БД ещё медленнее.
Ответ: Pipeline.
Три стадии с разной скоростью — классический Pipeline.
Стадия 1: читаем строки → канал.
Стадия 2: парсим JSON конкурентно (fan-out на несколько парсеров).
Стадия 3: пишем в БД (с ограничением через semaphore).
Каждая стадия работает независимо, backpressure возникает
автоматически через каналы.
Ситуация 3
Сервис обращается к трём источникам данных (Redis, PostgreSQL, внешний API) и возвращает агрегированный результат. Все три запроса независимы.
Ответ: errgroup (Fan-out + сбор результатов).
Три независимых запроса — запускаем параллельно через errgroup.
При ошибке любого — контекст отменяется, остальные прерываются.
Wait() возвращает первую ошибку.
Результаты пишем в индексированный слайс — без мьютекса,
каждая горутина пишет в свой индекс.
Ситуация 4
Нужно отправить уведомление через три канала (email, SMS, push). Достаточно чтобы хотя бы один канал сработал успешно — остальные можно отменить.
Ответ: Fan-out + first-result-wins через контекст.
Запускаем три горутины параллельно.
Первая успешная пишет в буферизованный канал.
Select читает первый результат и вызывает cancel() —
остальные горутины получают ctx.Done() и завершаются.
Паттерн "fastest" из задачи на Context.
Ситуация 5
Нужно агрегировать метрики из 50 микросервисов каждые 30 секунд. Каждый опрос занимает до 2 секунд.
Ответ: Fan-out + Fan-in с таймаутом.
Fan-out: запускаем 50 горутин параллельно.
Fan-in: собираем результаты в один канал.
Context с таймаутом 2 секунды — медленные сервисы
не задерживают агрегацию.
Частичный результат (только те кто успел) лучше
чем ждать всех.
Ситуация 6
Нужно обрабатывать события из Kafka. Каждое событие независимо. Хочется максимальной пропускной способности но не потерять события при завершении.
Ответ: Worker Pool с graceful shutdown.
Worker Pool: фиксированное количество воркеров читают
из общей очереди (канала куда пишет Kafka-консьюмер).
При завершении: закрываем канал задач, воркеры
дочитывают оставшиеся события и завершаются.
WaitGroup ждёт всех воркеров перед остановкой процесса.
Ситуация 7
Нужно логировать каждый запрос к API но запись в файл медленная и не должна блокировать обработчик.
Ответ: Буферизованный канал как асинхронная очередь.
Обработчик пишет в буферизованный канал (неблокирующий
для большинства случаев).
Отдельная горутина-воркер читает из канала и пишет в файл.
Если канал полон (буфер переполнен) — можно дропать
логи или блокироваться в зависимости от требований.
Ситуация 8
Нужно реализовать поиск по нескольким источникам (ElasticSearch, PostgreSQL full-text, внешний API). Возвращать объединённые результаты всех источников.
Ответ: Fan-out + Fan-in.
Fan-out: три горутины параллельно делают поиск.
Fan-in: merge результатов из трёх каналов в один.
Дедупликация результатов уже в основной горутине.
errgroup для обработки ошибок — если один источник
упал, всё равно возвращаем результаты от остальных
(игнорируем ошибку или логируем).