Пакет sync в Go
После горутин и каналов естественно встаёт вопрос: что делать, когда каналы — не лучший инструмент? Каналы отлично передают данные между горутинами, но когда нужно просто защитить общий ресурс от одновременного доступа — мьютекс проще и понятнее. Пакет sync предоставляет именно такие примитивы: блокировки, барьеры, однократную инициализацию и пул объектов.
Mutex — взаимное исключение
sync.Mutex гарантирует что в критической секции одновременно находится только одна горутина:
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
Несколько правил, которые стоит соблюдать всегда:
defer mu.Unlock() сразу после Lock() — защита от паники внутри критической секции. Если не использовать defer и внутри произойдёт паника — мьютекс останется залоченным навсегда, все горутины, ожидающие его, заблокируются:
// Плохо: паника внутри оставит мьютекс залоченным
func (c *Counter) Bad() {
c.mu.Lock()
doSomethingThatMightPanic() // паника — Unlock не вызовется
c.mu.Unlock()
}
// Хорошо: defer гарантирует Unlock при любом исходе
func (c *Counter) Good() {
c.mu.Lock()
defer c.mu.Unlock()
doSomethingThatMightPanic()
}
Мьютекс нельзя копировать после первого использования. Mutex содержит внутреннее состояние, и копирование заблокированного мьютекса скопирует это состояние — новый мьютекс окажется залоченным без возможности разблокировки:
type Cache struct {
mu sync.Mutex
data map[string]string
}
// Ошибка: копирование структуры копирует мьютекс
func badFunc(c Cache) { ... }
// Правильно: передаём по указателю
func goodFunc(c *Cache) { ... }
go vet обнаруживает копирование мьютексов и выдаёт предупреждение.
Минимизируйте критическую секцию. Чем дольше мьютекс удерживается — тем дольше остальные горутины ждут. Вычисления, не требующие защиты, выносите за пределы Lock/Unlock:
// Плохо: долгая операция под мьютексом
func (c *Cache) Set(key string) {
c.mu.Lock()
defer c.mu.Unlock()
value := fetchFromDatabase(key) // долго, но не требует защиты
c.data[key] = value
}
// Хорошо: под мьютексом только необходимое
func (c *Cache) Set(key string) {
value := fetchFromDatabase(key) // вне мьютекса
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
RWMutex — разделение читателей и писателей
sync.RWMutex — расширение обычного мьютекса для сценария "много читателей, редкие записи". Читатели не блокируют друг друга, писатель блокирует всех:
type Store struct {
mu sync.RWMutex
data map[string]string
}
func (s *Store) Get(key string) string {
s.mu.RLock() // разделяемая блокировка — другие читатели проходят
defer s.mu.RUnlock()
return s.data[key]
}
func (s *Store) Set(key, value string) {
s.mu.Lock() // эксклюзивная блокировка — все ждут
defer s.mu.Unlock()
s.data[key] = value
}
Когда RWMutex быстрее, а когда нет
RWMutex не всегда выигрывает у обычного Mutex — у него выше накладные расходы на управление состоянием. Он даёт преимущество только когда:
- Чтений значительно больше чем записей (соотношение 10:1 и выше)
- Критическая секция достаточно долгая, чтобы параллельное чтение давало ощутимый выигрыш
При коротких критических секциях и смешанной нагрузке обычный Mutex может оказаться быстрее. Как всегда — решает бенчмарк.
Ловушка: рекурсивный RLock
RWMutex не поддерживает рекурсивные блокировки. Если горутина вызывает RLock дважды, а между ними другая горутина пытается взять Lock — deadlock:
// Deadlock: вторая горutина пытается взять Lock пока первая держит RLock
// и хочет взять ещё один RLock
func (s *Store) DeadlockExample() {
s.mu.RLock()
defer s.mu.RUnlock()
// где-то внутри вызывается метод, который тоже берёт RLock
s.Get("key") // RLock внутри — если между ними был Lock-запрос, deadlock
}
WaitGroup — барьер для горутин
sync.WaitGroup — счётчик горутин. Позволяет дождаться завершения произвольного числа горутин:
func processItems(items []Item) {
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1)
go func(it Item) {
defer wg.Done()
process(it)
}(item)
}
wg.Wait() // блокируется пока счётчик не станет 0
}
Передача WaitGroup
WaitGroup нельзя копировать — передавайте по указателю:
// Плохо: копирование WaitGroup
func worker(wg sync.WaitGroup) { // копия — Done не влияет на оригинал
defer wg.Done()
}
// Хорошо: указатель
func worker(wg *sync.WaitGroup) {
defer wg.Done()
}
Паттерн: сбор результатов с WaitGroup
WaitGroup часто используется вместе с каналом для сбора результатов:
func fetchAll(urls []string) []Result {
results := make(chan Result, len(urls))
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
results <- fetch(u)
}(url)
}
// Закрываем канал когда все горутины завершились
go func() {
wg.Wait()
close(results)
}()
var all []Result
for r := range results {
all = append(all, r)
}
return all
}
Once — однократная инициализация
sync.Once гарантирует что функция выполнится ровно один раз, даже если Do вызывается из множества горутин одновременно:
type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{data: "initialized"}
})
return instance
}
Сколько бы горутин ни вызвали GetInstance одновременно — инициализация произойдёт ровно раз. Остальные горутины подождут завершения и получат уже готовое значение.
Once vs init()
init() тоже выполняется однократно, но всегда при старте программы. Once позволяет отложить инициализацию до момента первого использования (lazy initialization) — полезно когда инициализация дорогая или зависит от данных рантайма:
type DB struct {
once sync.Once
conn *sql.DB
}
func (d *DB) getConn() *sql.DB {
d.once.Do(func() {
d.conn, _ = sql.Open("postgres", dsn)
})
return d.conn
}
Ловушка: паника внутри Once
Если функция внутри Do паникует — Once считает её выполненной. Повторный вызов Do не перезапустит функцию:
var once sync.Once
func init() {
defer func() { recover() }()
once.Do(func() {
panic("что-то пошло не так")
})
}
once.Do(func() {
fmt.Println("я никогда не выполнюсь")
})
// Do уже "выполнен" — даже несмотря на панику
Если инициализация может упасть — лучше обрабатывать ошибку явно и хранить её рядом с результатом:
type lazyDB struct {
once sync.Once
db *sql.DB
err error
}
func (l *lazyDB) DB() (*sql.DB, error) {
l.once.Do(func() {
l.db, l.err = sql.Open("postgres", dsn)
})
return l.db, l.err
}
Pool — переиспользование объектов
sync.Pool — пул временных объектов для снижения нагрузки на GC. Вместо того чтобы каждый раз аллоцировать новый объект, берём из пула и возвращаем обратно:
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer) // создаётся если пул пуст
},
}
func processRequest(data []byte) string {
buf := bufPool.Get().(*bytes.Buffer) // берём из пула
defer func() {
buf.Reset() // очищаем перед возвратом
bufPool.Put(buf) // возвращаем в пул
}()
buf.Write(data)
return buf.String()
}
Важные свойства Pool
Объекты могут быть удалены GC в любой момент. Pool не является кешем — GC может очистить его между циклами сборки. Не храните в пуле объекты, которые нельзя пересоздать:
// Плохо: ожидание что объект сохранится между вызовами
pool.Put(expensiveObject)
// ... позже ...
obj := pool.Get() // может вернуть nil если GC очистил пул
Всегда проверяйте тип при Get. Get возвращает interface{} — нужен type assertion. Если пул пуст и New не задан — вернёт nil:
var pool sync.Pool // без New
obj := pool.Get()
if obj == nil {
// пул пуст и New не задан
}
Очищайте объект перед возвратом в пул. Объект из пула может содержать данные предыдущего использования:
// Плохо: возвращаем грязный буфер
bufPool.Put(buf)
// Хорошо: сбрасываем состояние
buf.Reset()
bufPool.Put(buf)
Где Pool реально помогает
Pool эффективен для объектов, которые: часто создаются и уничтожаются, дорого инициализируются, и не имеют состояния между использованиями. Классические примеры: bytes.Buffer, json.Encoder, временные слайсы. Сам encoding/json и fmt используют sync.Pool внутри.
sync.Map — конкурентный map
Стандартный map не потокобезопасен. sync.Map — специализированная конкурентная реализация, оптимизированная для конкретного сценария:
var sm sync.Map
// Store
sm.Store("key", "value")
// Load
val, ok := sm.Load("key")
if ok {
fmt.Println(val.(string))
}
// LoadOrStore — атомарно: загрузить или сохранить если нет
actual, loaded := sm.LoadOrStore("key", "default")
fmt.Println(loaded) // true если ключ уже был
// Delete
sm.Delete("key")
// Range — итерация (порядок не гарантирован)
sm.Range(func(k, v interface{}) bool {
fmt.Println(k, v)
return true // false — прервать итерацию
})
Когда sync.Map оправдан
Внутри sync.Map два уровня хранения: read-only map (доступен без блокировки через atomic) и dirty map (под мьютексом). Read path не требует блокировки — это главное преимущество.
Оправдан в двух сценариях:
- Ключи записываются один раз, потом только читаются (кеш с редкими обновлениями)
- Разные горутины работают с непересекающимися наборами ключей
При частых записях разных ключей sync.Map медленнее обычного map с sync.Mutex — dirty map постоянно промотируется в read, что дорого. В таких случаях лучше обычный map с мьютексом или шардированный map.
Атомарные операции — sync/atomic
Для простых числовых операций мьютекс избыточен. Пакет sync/atomic предоставляет атомарные операции на уровне процессора:
import "sync/atomic"
var counter int64
// Атомарный инкремент
atomic.AddInt64(&counter, 1)
// Атомарное чтение
val := atomic.LoadInt64(&counter)
// Атомарная запись
atomic.StoreInt64(&counter, 0)
// Compare-And-Swap — основа lock-free алгоритмов
swapped := atomic.CompareAndSwapInt64(&counter, 0, 1)
// если counter == 0, установить 1, вернуть true
С Go 1.19 появился типизированный atomic.Int64 — удобнее и безопаснее:
var counter atomic.Int64
counter.Add(1)
counter.Load()
counter.Store(0)
counter.CompareAndSwap(0, 1)
Атомарные операции быстрее мьютекса, но применимы только к простым значениям (числа, указатели). Для защиты составных операций или структур данных нужен мьютекс.
Вопросы на собеседовании
Q: В чём разница между Mutex и RWMutex? Когда что использовать?
A: Mutex — взаимное исключение: одна горутина в критической секции. RWMutex разделяет читателей и писателей: несколько читателей проходят одновременно, писатель блокирует всех. RWMutex выгоден при большом преобладании чтений (10:1 и выше) и достаточно долгих критических секциях. При коротких секциях и смешанной нагрузке обычный Mutex может оказаться быстрее из-за меньших накладных расходов.
Q: Почему мьютекс нельзя копировать?
A: Mutex хранит внутреннее состояние (залочен/не залочен). Копирование создаёт новый мьютекс с тем же состоянием — если оригинал был залочен, копия тоже будет залочена без возможности разблокировки. go vet обнаруживает это. Решение: всегда передавать структуры с мьютексами по указателю.
Q: Зачем использовать defer mu.Unlock()? Почему не вызвать Unlock явно?
A: defer гарантирует Unlock при любом завершении функции — в том числе при панике. Явный вызов Unlock пропустится если между Lock и Unlock произойдёт паника или ранний return — мьютекс останется залоченным навсегда.
Q: Что такое sync.Once? Гарантирует ли он выполнение если функция запаниковала?
A: Примитив для однократного выполнения функции при любом количестве конкурентных вызовов. Да, если функция внутри Do паникует — Once считает её выполненной и не повторит. Для обработки ошибок нужно хранить error рядом с результатом.
Q: Что такое sync.Pool? Может ли GC удалить объекты из пула?
A: Пул переиспользуемых объектов для снижения нагрузки на GC. Да, GC может очистить пул в любой момент между циклами сборки — Pool не является кешем с гарантиями хранения. Нельзя рассчитывать что объект сохранится между вызовами Get.
Q: Когда sync.Map лучше map с мьютексом, а когда хуже?
A: Лучше когда: ключи записываются редко и читаются часто, или разные горутины работают с разными ключами — read path без блокировки через atomic. Хуже при частых записях разных ключей — dirty map постоянно промотируется в read, накладные расходы превышают выгоду. В общем случае map с Mutex проще и предсказуемее.
Q: Чем atomic операции отличаются от мьютекса?
A: Atomic операции выполняются одной CPU инструкцией без блокировки — быстрее и без риска deadlock. Но применимы только к простым значениям: числа и указатели. Мьютекс защищает произвольные составные операции и структуры данных.
Q: Как правильно передавать WaitGroup в функцию?
A: Только по указателю *sync.WaitGroup. При передаче по значению копируется внутреннее состояние — Done в горутине не уменьшит счётчик оригинала, и Wait никогда не разблокируется.
Q: Что вернёт sync.Pool.Get() если пул пуст и New не задан?
A: nil. Всегда нужно либо задавать New, либо проверять результат Get на nil.
Задачи: Sync
Задача 1: Потокобезопасный счётчик
Уровень: Лёгкая
Что проверяет: базовое использование Mutex
Условие: Реализуй потокобезопасный счётчик Counter с методами Increment(), Decrement() и Value() int. Запусти 1000 горутин которые инкрементируют счётчик и убедись что результат равен 1000.
Решение:
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Decrement() {
c.mu.Lock()
defer c.mu.Unlock()
c.value--
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
var wg sync.WaitGroup
c := &Counter{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.Increment()
}()
}
wg.Wait()
fmt.Println(c.Value()) // всегда 1000
}
Задача 2: Кеш с однократной инициализацией
Уровень: Средняя
Что проверяет: sync.Once, lazy initialization, обработка ошибок инициализации
Условие: Реализуй ConfigCache — структуру которая лениво загружает конфиг при первом обращении. Загрузка дорогая (имитируй через time.Sleep), поэтому должна происходить ровно один раз даже при конкурентных запросах. Если загрузка упала с ошибкой — возвращай ошибку при каждом вызове.
Решение:
package main
import (
"errors"
"fmt"
"sync"
"time"
)
type Config struct {
DSN string
}
type ConfigCache struct {
once sync.Once
config *Config
err error
}
func (c *ConfigCache) Get() (*Config, error) {
c.once.Do(func() {
fmt.Println("загружаем конфиг...") // выполнится ровно раз\n time.Sleep(100 * time.Millisecond)
// Имитируем загрузку
c.config = &Config{DSN: "postgres://localhost/mydb"}
// c.err = errors.New("failed to load config") // для теста ошибки
})
return c.config, c.err
}
func main() {
cache := &ConfigCache{}
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
cfg, err := cache.Get()
if err != nil {
fmt.Printf("горутина %d: ошибка: %v\n", n, err)
return
}
fmt.Printf("горутина %d: DSN=%s\n", n, cfg.DSN)
}(i)
}
wg.Wait()
}
// "загружаем конфиг..." выведется ровно один раз
// несмотря на 5 конкурентных вызовов.
_ = errors.New // используем пакет
Задача 3: Пул воркеров с ограничением через semaphore
Уровень: Сложная
Что проверяет: комбинирование sync примитивов, реализация семафора
Условие: Реализуй функцию processAll(items []int, maxConcurrent int, fn func(int) int) []int которая обрабатывает все элементы конкурентно, но не более maxConcurrent одновременно. Порядок результатов должен совпадать с входными данными.
Подсказка: Семафор через буферизованный канал. Порядок через индексированный результирующий слайс.
Решение:
package main
import (
"fmt"
"sync"
"time"
)
func processAll(items []int, maxConcurrent int, fn func(int) int) []int {
results := make([]int, len(items))
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for i, item := range items {
wg.Add(1)
go func(idx, val int) {
defer wg.Done()
sem <- struct{}{} // захватываем слот
defer func() { <-sem }() // освобождаем слот
results[idx] = fn(val)
}(i, item)
}
wg.Wait()
return results
}
func main() {
items := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result := processAll(items, 3, func(n int) int {
time.Sleep(10 * time.Millisecond) // имитация работы
return n * n
})
fmt.Println(result) // [1 4 9 16 25 36 49 64 81 100]
}