跳到主要内容

Go 并发编程

本章将介绍 Go 语言强大的并发编程能力。Go 从语言层面原生支持并发,通过 goroutine 和 channel 使得并发编程变得简单而高效。

并发与并行

在开始学习 Go 的并发特性之前,我们需要理解两个重要概念:

  • 并发(Concurrency):是指同时处理多个任务的能力。在单核 CPU 上,通过时间片轮转的方式交替执行多个任务,从宏观上看像是同时进行的。
  • 并行(Parallelism):是指同时执行多个任务的能力。需要多核 CPU 支持,每个核心真正同时执行不同的任务。
关键区别

并发关注的是结构,即如何组织程序来处理多个任务;并行关注的是执行,即如何同时运行多个任务。Go 的并发模型可以让程序在单核上实现并发,在多核上实现并行。

Go 的并发模型

Go 采用了 CSP(Communicating Sequential Processes,通信顺序进程) 模型,核心理念是:

"不要通过共享内存来通信,而要通过通信来共享内存。"

这意味着在 Go 中,goroutine 之间通过 channel 传递数据来通信,而不是通过共享变量并加锁来同步。

Go 调度器:GMP 模型

Go 运行时实现了自己的调度器,称为 GMP 模型:

  • G(Goroutine):用户态的轻量级线程,初始栈大小只有 2KB
  • M(Machine):操作系统线程,真正执行代码的载体
  • P(Processor):逻辑处理器,负责管理和调度 G

调度过程:

  1. 每个 P 维护一个可运行的 G 队列(本地队列)
  2. M 绑定到 P 后,从 P 的队列中获取 G 执行
  3. 当 G 阻塞(如系统调用)时,M 会释放 P,让其他 M 绑定到这个 P
  4. 全局队列存放等待执行的 G,P 会定期从全局队列获取 G
    全局队列 (Global Queue)
┌─────────────────────────────┐
│ G1 G2 G3 G4 G5 ... │
└─────────────┬───────────────┘

┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│ P1 │ │ P2 │ │ P3 │
│ 本地 │ │ 本地 │ │ 本地 │
│ 队列 │ │ 队列 │ │ 队列 │
│ G6 G7 │ │ G8 G9 │ │ G10 │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│ M1 │ │ M2 │ │ M3 │
└───────┘ └───────┘ └───────┘

理解 GMP 模型有助于:

  • 合理设置 GOMAXPROCS(默认为 CPU 核数)
  • 理解 goroutine 为什么如此轻量
  • 分析并发程序的性能问题

Goroutine

Goroutine 是 Go 中轻量级的并发执行单元。它比操作系统线程更轻量,初始栈大小只有 2KB(可以动态伸缩),而线程通常需要 1MB 以上的栈空间。

创建 Goroutine

使用 go 关键字启动一个 goroutine:

package main

import (
"fmt"
"time"
)

func sayHello() {
fmt.Println("Hello from goroutine!")
}

func main() {
// 启动 goroutine
go sayHello()

fmt.Println("Hello from main!")

// 等待 goroutine 执行完成
time.Sleep(time.Second)
}

输出顺序可能为:

Hello from main!
Hello from goroutine!
注意

main 函数本身也是一个 goroutine。当 main 函数返回时,所有其他 goroutine 都会被立即终止,不管它们是否执行完成。这就是为什么上面代码需要 time.Sleep 来等待 goroutine 完成。

匿名函数 Goroutine

可以使用匿名函数创建 goroutine:

func main() {
// 方式一:立即执行的匿名函数
go func() {
fmt.Println("匿名 goroutine")
}()

// 方式二:带参数的匿名函数
message := "Hello"
go func(msg string) {
fmt.Println(msg)
}(message)

time.Sleep(time.Second)
}

闭包变量捕获问题

在使用闭包时需要注意变量捕获的问题:

// ❌ 错误示例:所有 goroutine 可能都打印 3
for i := 0; i < 3; i++ {
go func() {
fmt.Println(i) // 捕获的是变量 i 的引用
}()
}

// ✅ 正确示例:将 i 作为参数传递
for i := 0; i < 3; i++ {
go func(n int) {
fmt.Println(n) // 每个 goroutine 有自己的 n 副本
}(i)
}

使用 sync.WaitGroup 等待

使用 time.Sleep 等待 goroutine 完成是不推荐的,应该使用 sync.WaitGroup

package main

import (
"fmt"
"sync"
"time"
)

func worker(id int, wg *sync.WaitGroup) {
// 延迟调用 Done,确保函数退出时通知 WaitGroup
defer wg.Done()

fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}

func main() {
var wg sync.WaitGroup

// 启动 5 个 goroutine
for i := 1; i <= 5; i++ {
// 在启动 goroutine 前增加计数器
wg.Add(1)
go worker(i, &wg)
}

// 等待所有 goroutine 完成
wg.Wait()
fmt.Println("All workers done")
}

WaitGroup 的三个方法:

  • Add(delta int):增加计数器,通常在启动 goroutine 前调用
  • Done():减少计数器,等价于 Add(-1)
  • Wait():阻塞直到计数器归零
最佳实践

wg.Add(1) 应该在启动 goroutine 之前调用,而不是在 goroutine 内部调用,以避免竞态条件。

Channel

Channel(通道)是 goroutine 之间通信的管道。通过 channel,一个 goroutine 可以发送值给另一个 goroutine。

创建 Channel

使用 make 创建 channel:

// 无缓冲 channel
ch := make(chan int)

// 有缓冲 channel,容量为 10
ch := make(chan int, 10)

发送和接收

使用 <- 操作符发送和接收数据:

ch := make(chan int)

// 发送数据
ch <- 42

// 接收数据
value := <-ch

// 接收数据并检查 channel 是否已关闭
value, ok := <-ch
// 如果 ok 为 false,表示 channel 已关闭且没有更多数据

无缓冲 Channel

无缓冲 channel 也称为同步 channel,发送操作会阻塞直到另一个 goroutine 执行接收操作:

func main() {
ch := make(chan string)

// 发送 goroutine
go func() {
ch <- "Hello" // 阻塞,直到有人接收
fmt.Println("消息已发送")
}()

// 接收
msg := <-ch // 阻塞,直到有数据可接收
fmt.Println(msg)
}

特点:

  • 发送和接收操作是同步的
  • 每次发送必须有对应的接收,否则会死锁
  • 适合用于 goroutine 间的同步

有缓冲 Channel

有缓冲 channel 有一个固定大小的缓冲区,发送操作只有在缓冲区满时才会阻塞:

func main() {
// 创建容量为 2 的缓冲 channel
ch := make(chan int, 2)

// 可以连续发送两个值而不阻塞
ch <- 1
ch <- 2

// 第三个发送会阻塞(缓冲区已满)
// ch <- 3 // 死锁!

// 接收数据
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
}

特点:

  • 发送操作在缓冲区未满时不会阻塞
  • 接收操作在缓冲区不为空时不会阻塞
  • 适合用于生产者-消费者模式

关闭 Channel

使用 close 函数关闭 channel:

ch := make(chan int, 3)

ch <- 1
ch <- 2
close(ch) // 关闭 channel

// 从已关闭的 channel 接收
value, ok := <-ch
fmt.Println(value, ok) // 1 true

value, ok = <-ch
fmt.Println(value, ok) // 2 true

value, ok = <-ch
fmt.Println(value, ok) // 0 false(channel 已空且关闭)

关闭 channel 的规则:

  1. 只有发送者应该关闭 channel,接收者不应该关闭
  2. 向已关闭的 channel 发送数据会 panic
  3. 从已关闭的 channel 接收会返回零值和 false
  4. 关闭一个已关闭的 channel 会 panic
  5. 关闭 nil channel 会 panic

使用 range 遍历 Channel

可以使用 for range 持续从 channel 接收数据,直到 channel 关闭:

func main() {
ch := make(chan int, 5)

// 发送数据
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 发送完毕后关闭
}()

// 接收数据直到 channel 关闭
for value := range ch {
fmt.Println(value)
}

fmt.Println("Channel 已关闭")
}

单向 Channel

Go 支持单向 channel 类型,用于限制 channel 的操作方向:

// 只发送 channel
func sender(ch chan<- int) {
ch <- 42
// <-ch // 编译错误!不能从只发送 channel 接收
}

// 只接收 channel
func receiver(ch <-chan int) {
value := <-ch
fmt.Println(value)
// ch <- 1 // 编译错误!不能向只接收 channel 发送
}

func main() {
ch := make(chan int)
go sender(ch)
go receiver(ch)
}

类型转换:

ch := make(chan int)        // 双向 channel
var sendCh chan<- int = ch // 转换为只发送
var recvCh <-chan int = ch // 转换为只接收

Select

select 语句用于同时等待多个 channel 操作,类似于网络编程中的 select/poll/epoll

基本用法

func main() {
ch1 := make(chan string)
ch2 := make(chan string)

go func() {
time.Sleep(time.Second)
ch1 <- "from ch1"
}()

go func() {
time.Sleep(500 * time.Millisecond)
ch2 <- "from ch2"
}()

// 等待最先到达的数据
select {
case msg1 := <-ch1:
fmt.Println("Received", msg1)
case msg2 := <-ch2:
fmt.Println("Received", msg2)
}
}

超时处理

使用 time.After 实现超时:

func main() {
ch := make(chan string)

go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()

select {
case res := <-ch:
fmt.Println(res)
case <-time.After(time.Second):
fmt.Println("Timeout!")
}
}

非阻塞操作

使用 default 实现非阻塞操作:

func main() {
ch := make(chan int, 1)

select {
case ch <- 42:
fmt.Println("发送成功")
default:
fmt.Println("发送失败(channel 满)")
}

select {
case value := <-ch:
fmt.Println("接收:", value)
default:
fmt.Println("接收失败(channel 空)")
}
}

检查 Channel 是否已关闭

func main() {
ch := make(chan int, 1)
close(ch)

select {
case value, ok := <-ch:
if ok {
fmt.Println("接收到:", value)
} else {
fmt.Println("Channel 已关闭")
}
default:
fmt.Println("没有数据")
}
}

并发安全

当多个 goroutine 同时访问共享资源时,需要使用同步机制来保证数据安全。

sync.Mutex

互斥锁用于保护临界区,同一时间只有一个 goroutine 可以访问:

package main

import (
"fmt"
"sync"
)

type Counter struct {
mu sync.Mutex
value int
}

func (c *Counter) Increment() {
c.mu.Lock() // 获取锁
defer c.mu.Unlock() // 释放锁
c.value++
}

func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}

func main() {
counter := Counter{}
var wg sync.WaitGroup

// 启动 1000 个 goroutine
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}

wg.Wait()
fmt.Println("Counter:", counter.Value()) // 1000
}

sync.RWMutex

读写锁允许多个读者同时读取,但写者需要独占访问:

package main

import (
"fmt"
"sync"
"time"
)

type Cache struct {
mu sync.RWMutex
data map[string]string
}

func (c *Cache) Get(key string) string {
c.mu.RLock() // 读锁
defer c.mu.RUnlock()
return c.data[key]
}

func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁
defer c.mu.Unlock()
c.data[key] = value
}

func main() {
cache := &Cache{
data: make(map[string]string),
}

// 写操作
go func() {
cache.Set("key", "value")
}()

// 多个读操作可以同时进行
for i := 0; i < 3; i++ {
go func() {
fmt.Println(cache.Get("key"))
}()
}
}

读写锁的规则:

  • 读锁可以同时被多个 goroutine 获取
  • 写锁是独占的,只能被一个 goroutine 获取
  • 如果有写锁存在,读锁会被阻塞
  • 如果有读锁存在,写锁会被阻塞

sync/atomic

Go 提供了原子操作包,用于对基本类型进行无锁并发操作:

package main

import (
"fmt"
"sync"
"sync/atomic"
)

func main() {
var counter int64
var wg sync.WaitGroup

for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // 原子加操作
}()
}

wg.Wait()
fmt.Println("Counter:", counter) // 1000

// 比较并交换(Compare And Swap)
old := atomic.LoadInt64(&counter)
swapped := atomic.CompareAndSwapInt64(&counter, old, 0)
fmt.Println("Swapped:", swapped) // true
fmt.Println("Counter:", counter) // 0
}

常用的原子操作:

  • Add:原子加法
  • Load:原子读取
  • Store:原子存储
  • Swap:原子交换
  • CompareAndSwap:比较并交换

atomic.Int64(Go 1.19+)

Go 1.19 引入了新的原子类型,使用更加方便:

package main

import (
"fmt"
"sync"
"sync/atomic"
)

func main() {
var counter atomic.Int64
var wg sync.WaitGroup

for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Add(1)
}()
}

wg.Wait()
fmt.Println("Counter:", counter.Load()) // 1000

// 原子交换
old := counter.Swap(0)
fmt.Println("Old value:", old) // 1000
fmt.Println("New value:", counter.Load()) // 0

// 比较并交换
swapped := counter.CompareAndSwap(0, 42)
fmt.Println("Swapped:", swapped) // true
fmt.Println("Value:", counter.Load()) // 42
}

Context

Context 包用于在 API 边界和进程之间传递截止时间、取消信号和其他请求范围的值。

创建 Context

import "context"

// 背景 context,作为根 context
ctx := context.Background()

// TODO context,用于不确定使用哪个 context 时
ctx := context.TODO()

取消信号

package main

import (
"context"
"fmt"
"time"
)

func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Worker stopped:", ctx.Err())
return
default:
fmt.Println("Working...")
time.Sleep(500 * time.Millisecond)
}
}
}

func main() {
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())

go worker(ctx)

time.Sleep(2 * time.Second)
cancel() // 取消 context

time.Sleep(time.Second) // 等待 worker 退出
}

超时控制

func main() {
// 创建带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() // 释放资源

select {
case <-time.After(2 * time.Second):
fmt.Println("操作完成")
case <-ctx.Done():
fmt.Println("超时:", ctx.Err()) // context deadline exceeded
}
}

截止时间

func main() {
// 设置截止时间
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

// 检查截止时间
d, ok := ctx.Deadline()
fmt.Println("Deadline:", d, "set:", ok)

select {
case <-ctx.Done():
fmt.Println("已到达截止时间")
}
}

传递值

func main() {
// 创建带值的 context
ctx := context.WithValue(context.Background(), "userID", 12345)
ctx = context.WithValue(ctx, "requestID", "abc-123")

// 获取值
if userID, ok := ctx.Value("userID").(int); ok {
fmt.Println("UserID:", userID)
}

if requestID, ok := ctx.Value("requestID").(string); ok {
fmt.Println("RequestID:", requestID)
}
}
注意

Context 的 Value 应该用于传递请求范围的数据,如 trace ID、用户 ID 等,不要用于传递可选参数或业务数据。

HTTP 请求中的 Context

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, "GET", "https://example.com", nil)
if err != nil {
log.Fatal(err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()

fmt.Println("Status:", resp.Status)
}

并发模式

Pipeline 模式

Pipeline 是一系列通过 channel 连接的处理阶段:

package main

import "fmt"

// 第一阶段:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

// 第二阶段:处理数据(平方)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

func main() {
// 构建 pipeline
c := generate(2, 3)
out := square(c)

// 消费结果
for n := range out {
fmt.Println(n) // 4, 9
}

// 可以链式调用
for n := range square(square(generate(2, 3))) {
fmt.Println(n) // 16, 81
}
}

Fan-out / Fan-in 模式

Fan-out:多个 goroutine 从同一个 channel 读取数据,实现并行处理。 Fan-in:将多个 channel 的结果合并到一个 channel。

package main

import (
"fmt"
"sync"
)

// Fan-out:启动多个 worker 处理同一个输入 channel
func worker(id int, in <-chan int, out chan<- int) {
for n := range in {
fmt.Printf("Worker %d: processing %d\n", id, n)
out <- n * n
}
}

// Fan-in:合并多个 channel 到一个 channel
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// 为每个输入 channel 启动一个 goroutine
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}

wg.Add(len(channels))
for _, c := range channels {
go output(c)
}

// 等待所有输入 channel 关闭后,关闭输出 channel
go func() {
wg.Wait()
close(out)
}()

return out
}

func main() {
// 输入 channel
in := make(chan int, 10)

// 启动 3 个 worker
out1 := make(chan int)
out2 := make(chan int)
out3 := make(chan int)

go worker(1, in, out1)
go worker(2, in, out2)
go worker(3, in, out3)

// 发送数据
for i := 0; i < 10; i++ {
in <- i
}
close(in)

// 关闭输出 channel
go func() {
// 等待一段时间让 worker 处理完
time.Sleep(time.Second)
close(out1)
close(out2)
close(out3)
}()

// 合并结果
for n := range merge(out1, out2, out3) {
fmt.Println(n)
}
}

Worker Pool 模式

创建固定数量的 worker 来处理任务:

package main

import (
"fmt"
"sync"
"time"
)

type Job struct {
ID int
Data string
}

type Result struct {
Job Job
Result string
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d: processing job %d\n", id, job.ID)
time.Sleep(time.Second) // 模拟处理
results <- Result{
Job: job,
Result: fmt.Sprintf("Processed: %s", job.Data),
}
}
}

func main() {
const numWorkers = 3
const numJobs = 10

jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)

var wg sync.WaitGroup

// 启动 worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}

// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("Task %d", i)}
}
close(jobs)

// 等待所有 worker 完成
go func() {
wg.Wait()
close(results)
}()

// 收集结果
for result := range results {
fmt.Printf("Result: Job %d -> %s\n", result.Job.ID, result.Result)
}
}

生产者-消费者模式

package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
num := rand.Intn(100)
ch <- num
fmt.Printf("Producer %d: produced %d\n", id, num)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
}

func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for num := range ch {
fmt.Printf("Consumer %d: consumed %d\n", id, num)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}

func main() {
ch := make(chan int, 5)
var wg sync.WaitGroup

// 启动生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go producer(i, ch, &wg)
}

// 启动消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(i, ch, &wg)
}

// 等待生产者完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()

time.Sleep(5 * time.Second)
}

速率限制模式

使用 ticker 实现速率限制:

package main

import (
"fmt"
"time"
)

func main() {
requests := make(chan int, 10)
for i := 0; i < 10; i++ {
requests <- i
}
close(requests)

// 每 200ms 处理一个请求
limiter := time.Tick(200 * time.Millisecond)

for req := range requests {
<-limiter // 等待 ticker
fmt.Println("Request", req, time.Now().Format("15:04:05.000"))
}
}

等待超时模式

func waitForResult() (string, error) {
result := make(chan string, 1)

go func() {
time.Sleep(2 * time.Second) // 模拟耗时操作
result <- "success"
}()

select {
case res := <-result:
return res, nil
case <-time.After(time.Second):
return "", fmt.Errorf("timeout")
}
}

其他同步原语

sync.Once

确保某个操作只执行一次:

package main

import (
"fmt"
"sync"
)

type Singleton struct {
name string
}

var (
instance *Singleton
once sync.Once
)

func GetInstance() *Singleton {
once.Do(func() {
fmt.Println("Creating instance...")
instance = &Singleton{name: "Singleton"}
})
return instance
}

func main() {
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
inst := GetInstance()
fmt.Printf("Instance: %p\n", inst)
}()
}

wg.Wait()
// 输出:Creating instance... 只出现一次
// 所有 goroutine 获取的是同一个实例
}

sync.Cond

条件变量,用于等待某个条件成立:

package main

import (
"fmt"
"sync"
"time"
)

type Queue struct {
items []int
cond *sync.Cond
}

func NewQueue() *Queue {
return &Queue{
cond: sync.NewCond(&sync.Mutex{}),
}
}

func (q *Queue) Put(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

q.items = append(q.items, item)
fmt.Println("Produced:", item)
q.cond.Signal() // 通知等待的消费者
}

func (q *Queue) Get() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()

// 等待队列不为空
for len(q.items) == 0 {
q.cond.Wait()
}

item := q.items[0]
q.items = q.items[1:]
fmt.Println("Consumed:", item)
return item
}

func main() {
queue := NewQueue()

// 消费者
go func() {
for i := 0; i < 5; i++ {
queue.Get()
}
}()

// 生产者
for i := 0; i < 5; i++ {
queue.Put(i)
time.Sleep(100 * time.Millisecond)
}

time.Sleep(time.Second)
}

sync.Pool

对象池,用于复用临时对象,减少内存分配:

package main

import (
"fmt"
"sync"
)

var pool = sync.Pool{
New: func() interface{} {
fmt.Println("Creating new object")
return make([]byte, 1024)
},
}

func main() {
// 从池中获取对象
data := pool.Get().([]byte)
fmt.Println("Got object, len:", len(data))

// 使用对象...

// 将对象放回池中
pool.Put(data)

// 再次获取,可能复用之前的对象
data2 := pool.Get().([]byte)
fmt.Println("Got object again, len:", len(data2))

// 输出:
// Creating new object(只有第一次)
// Got object, len: 1024
// Got object again, len: 1024
}

sync.Map

sync.Map 是并发安全的 map 实现,适用于读多写少的场景。与使用 sync.RWMutex 保护的普通 map 相比,sync.Map 在特定场景下有更好的性能表现。

Go 1.24 的改进

Go 1.24 对 sync.Map 的实现进行了重大改进,性能显著提升:

  • 对于修改不相交键集合的操作,在大型 map 上竞争更少
  • 不再需要"预热"时间就能达到低竞争的读取性能
  • 如果遇到问题,可以通过 GOEXPERIMENT=nosynchashtriemap 切换回旧实现
package main

import (
"fmt"
"sync"
)

func main() {
var m sync.Map

// 存储
m.Store("name", "张三")
m.Store("age", 25)

// 读取
if value, ok := m.Load("name"); ok {
fmt.Println("Name:", value)
}

// 读取或存储(如果不存在则存储)
value, loaded := m.LoadOrStore("city", "北京")
fmt.Println("City:", value, "loaded:", loaded)

// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true
})

// 删除
m.Delete("age")

// 比较并删除(Go 1.20+)
m.CompareAndDelete("name", "张三")

// 比较并交换(Go 1.20+)
m.CompareAndSwap("age", 25, 30)
}

适用场景

场景推荐使用原因
读多写少sync.Map针对此场景优化
键集合稳定sync.Map避免频繁增删键
需要并发访问sync.Map内置并发安全
频繁写入map + RWMutexsync.Map 对写场景优化较少
键集合频繁变化map + RWMutex频繁增删会影响性能

sync.Map 与普通 map + RWMutex 的对比

// 方式一:sync.Map
var sm sync.Map
sm.Store("key", "value")
v, ok := sm.Load("key")

// 方式二:map + RWMutex
var (
m = make(map[string]string)
mu sync.RWMutex
)
mu.Lock()
m["key"] = "value"
mu.Unlock()
mu.RLock()
v, ok := m["key"]
mu.RUnlock()

sync.Map 的优势在于它内部使用了更复杂的分片机制,在读多写少的场景下可以减少锁竞争。但在写入频繁的场景下,普通的 map 配合 RWMutex 可能更简单高效。

常见问题与陷阱

死锁

死锁是指两个或多个 goroutine 相互等待,导致都无法继续执行:

// ❌ 死锁示例 1:无缓冲 channel 只有发送没有接收
func deadlock1() {
ch := make(chan int)
ch <- 42 // 阻塞,没有接收者
}

// ❌ 死锁示例 2:循环等待
func deadlock2() {
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
<-ch1 // 等待 ch1
ch2 <- 1
}()

ch1 <- 1 // 等待被接收
<-ch2 // 等待 ch2
}

// ✅ 正确:使用缓冲 channel
func correct() {
ch := make(chan int, 1)
ch <- 42
fmt.Println(<-ch)
}

Goroutine 泄露

Goroutine 泄露是指 goroutine 永远不会退出,持续占用资源:

// ❌ 泄露示例:goroutine 永远阻塞
func leak() {
ch := make(chan int)
go func() {
<-ch // 永远没有人发送数据
}()
}

// ✅ 正确:使用 context 或 done channel
func noLeak() {
ch := make(chan int)
done := make(chan struct{})

go func() {
select {
case <-ch:
// 处理数据
case <-done:
return // 可以退出
}
}()

close(done) // 通知 goroutine 退出
}

竞态条件

多个 goroutine 同时访问共享数据且至少有一个是写操作:

// ❌ 竞态条件
var counter int

func raceCondition() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 竞态!
}()
}
wg.Wait()
fmt.Println(counter) // 可能不是 1000
}

// ✅ 正确:使用 mutex 或 atomic
var (
counterSafe int
mu sync.Mutex
)

func noRaceCondition() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counterSafe++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(counterSafe) // 一定是 1000
}

可以使用 go run -race 检测竞态条件。

优雅关闭模式

在生产环境中,服务需要能够优雅关闭,等待所有正在处理的请求完成后再退出。

HTTP 服务优雅关闭

package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)

func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Second) // 模拟处理
fmt.Fprintln(w, "Hello")
})

server := &http.Server{
Addr: ":8080",
Handler: mux,
}

// 启动服务器
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
fmt.Printf("服务器错误: %v\n", err)
}
}()
fmt.Println("服务器启动在 :8080")

// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
fmt.Println("正在关闭服务器...")

// 优雅关闭,最多等待 30 秒
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := server.Shutdown(ctx); err != nil {
fmt.Printf("服务器关闭错误: %v\n", err)
}
fmt.Println("服务器已关闭")
}

Worker 优雅关闭

func gracefulWorkerPool(numWorkers int, jobs <-chan Job) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

// 启动 workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(ctx, id, jobs)
}(i)
}

// 等待关闭信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

fmt.Println("正在停止 workers...")
cancel() // 通知所有 workers 停止

// 等待所有 workers 完成当前任务
wg.Wait()
fmt.Println("所有 workers 已停止")
}

func worker(ctx context.Context, id int, jobs <-chan Job) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 收到停止信号\n", id)
return
case job, ok := <-jobs:
if !ok {
return
}
processJob(job)
}
}
}

errgroup

golang.org/x/sync/errgroup 包提供了更方便的并发控制方式,特别适合需要并行执行多个任务并在任一失败时取消的场景。

package main

import (
"context"
"fmt"
"net/http"
"time"

"golang.org/x/sync/errgroup"
)

func main() {
g, ctx := errgroup.WithContext(context.Background())

urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://invalid.url", // 会失败
}

for _, url := range urls {
url := url // 捕获变量
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

fmt.Printf("URL: %s, Status: %d\n", url, resp.StatusCode)
return nil
})
}

// 等待所有 goroutine 完成
if err := g.Wait(); err != nil {
fmt.Printf("发生错误: %v\n", err)
}
fmt.Println("所有请求完成")
}

errgroup 限制并发数

func fetchAll(ctx context.Context, urls []string) error {
g, ctx := errgroup.WithContext(ctx)

// 限制最多 5 个并发
g.SetLimit(5)

for _, url := range urls {
url := url
g.Go(func() error {
return fetchURL(ctx, url)
})
}

return g.Wait()
}

testing/synctest:并发测试(Go 1.24实验性)

Go 1.24 引入了实验性的 testing/synctest 包,专门用于测试并发代码。这个包解决了并发测试中常见的时序问题和不确定性。

为什么需要 synctest?

测试并发代码的挑战:

  1. 时序不确定:goroutine 的执行顺序不可预测
  2. 时间等待:使用 time.Sleep 会让测试变慢且不稳定
  3. 竞态条件:测试可能因时序不同而产生不同结果

synctest 通过提供一个"气泡"环境来解决这些问题,在这个环境中:

  • 时间包函数操作的是假时钟
  • 可以等待所有 goroutine 阻塞

启用 synctest

由于是实验性功能,必须通过环境变量启用:

# 运行测试时启用
GOEXPERIMENT=synctest go test

# 或在构建时设置
GOEXPERIMENT=synctest go build

基本用法

synctest.Run:创建隔离的测试气泡

package main

import (
"testing"
"testing/synctest"
"time"
)

func TestConcurrentTimer(t *testing.T) {
synctest.Run(func() {
// 在这个气泡内,time 包使用假时钟
done := make(chan time.Time)

go func() {
// 这个 time.After 使用假时钟
done <- <-time.After(5 * time.Second)
}()

// 等待 goroutine 阻塞
synctest.Wait()

// 现在可以安全地验证状态
select {
case <-done:
t.Error("timer should not have fired yet")
default:
// 预期:计时器还未触发
}
})
}

synctest.Wait:等待所有 goroutine 阻塞

synctest.Wait() 会阻塞直到当前气泡内的所有 goroutine 都处于阻塞状态(等待 channel 操作、锁、time.Sleep 等)。

func TestProducerConsumer(t *testing.T) {
synctest.Run(func() {
ch := make(chan int, 1)
results := make([]int, 0)

// 生产者
go func() {
ch <- 1
ch <- 2
close(ch)
}()

// 消费者
go func() {
for v := range ch {
results = append(results, v)
}
}()

// 等待所有 goroutine 完成工作并阻塞
synctest.Wait()

// 现在可以安全检查结果
if len(results) != 2 {
t.Errorf("expected 2 results, got %d", len(results))
}
})
}

测试超时逻辑

synctest 特别适合测试超时和计时器相关的代码:

func TestTimeout(t *testing.T) {
synctest.Run(func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

done := make(chan struct{})

go func() {
defer close(done)

select {
case <-ctx.Done():
// 超时处理
case <-time.After(2 * time.Second):
t.Error("should have timed out first")
}
}()

// 等待 goroutine 阻塞
synctest.Wait()

// 验证 context 已过期
if ctx.Err() != context.DeadlineExceeded {
t.Errorf("expected DeadlineExceeded, got %v", ctx.Err())
}
})
}

测试重试逻辑

func TestRetryWithBackoff(t *testing.T) {
synctest.Run(func() {
attempts := 0

err := retryWithBackoff(3, func() error {
attempts++
if attempts < 3 {
return errors.New("not yet")
}
return nil
})

if err != nil {
t.Errorf("expected success, got %v", err)
}
if attempts != 3 {
t.Errorf("expected 3 attempts, got %d", attempts)
}
})
}

func retryWithBackoff(maxAttempts int, fn func() error) error {
backoff := 100 * time.Millisecond

for i := 0; i < maxAttempts; i++ {
if err := fn(); err == nil {
return nil
}

if i < maxAttempts-1 {
time.Sleep(backoff)
backoff *= 2
}
}
return errors.New("max attempts exceeded")
}

完整示例:测试并发缓存

package cache

import (
"testing"
"testing/synctest"
"time"
)

type Cache struct {
data map[string]string
fresh map[string]time.Time
ttl time.Duration
}

func NewCache(ttl time.Duration) *Cache {
return &Cache{
data: make(map[string]string),
fresh: make(map[string]time.Time),
ttl: ttl,
}
}

func (c *Cache) Get(key string) (string, bool) {
if freshTime, ok := c.fresh[key]; ok {
if time.Since(freshTime) > c.ttl {
return "", false // 已过期
}
}
v, ok := c.data[key]
return v, ok
}

func (c *Cache) Set(key, value string) {
c.data[key] = value
c.fresh[key] = time.Now()
}

func TestCacheExpiration(t *testing.T) {
synctest.Run(func() {
cache := NewCache(1 * time.Second)

// 设置缓存
cache.Set("key", "value")

// 立即获取应该成功
if v, ok := cache.Get("key"); !ok || v != "value" {
t.Error("expected to get value immediately")
}

// 等待 TTL 过期
time.Sleep(2 * time.Second)
synctest.Wait()

// 过期后应该获取失败
if _, ok := cache.Get("key"); ok {
t.Error("expected cache to be expired")
}
})
}

注意事项

  1. 实验性:API 可能会在未来的 Go 版本中变化
  2. 构建标签:必须通过 -tags=synctestGOEXPERIMENT=synctest 启用
  3. 气泡隔离:每个 synctest.Run 创建独立的气泡,气泡间的 goroutine 互不影响
  4. 时间操作:气泡内所有 time 包函数使用假时钟

与传统方法的对比

方法优点缺点
time.Sleep简单测试慢、不稳定
mock 时间灵活需要额外代码、复杂
synctest原生支持、快速、确定实验性、API 可能变化

最佳实践

1. 使用 defer 关闭资源

func processFile(filename string) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close() // 确保文件关闭

// 处理文件...
return nil
}

2. 正确处理 goroutine 启动

// ❌ 错误:在 goroutine 内部调用 wg.Add
go func() {
wg.Add(1) // 可能在 wg.Wait() 之后才执行
defer wg.Done()
// ...
}()

// ✅ 正确:在启动 goroutine 前调用 wg.Add
wg.Add(1)
go func() {
defer wg.Done()
// ...
}()

3. 使用 context 进行取消

func operation(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// 执行操作
}
return nil
}

4. 避免 goroutine 泄露

func worker(ctx context.Context, jobs <-chan Job) {
for {
select {
case job, ok := <-jobs:
if !ok {
return // channel 已关闭
}
process(job)
case <-ctx.Done():
return // 收到取消信号
}
}
}

5. 合理使用缓冲 channel

// 已知数量的数据:使用适当大小的缓冲
ch := make(chan int, 100)

// 未知数量的数据:使用无缓冲或小缓冲
ch := make(chan int) // 或 make(chan int, 1)

6. 使用 panic 保护

func safeGo(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("goroutine panic: %v\n%s\n", r, debug.Stack())
}
}()
fn()
}()
}

// 使用
safeGo(func() {
// 可能 panic 的代码
panic("oops")
})
// 程序不会崩溃

小结

本章我们学习了:

  1. Goroutine:轻量级并发执行单元,使用 go 关键字启动
  2. Channel:goroutine 之间的通信机制,分为有缓冲和无缓冲
  3. Select:多路复用,同时监听多个 channel
  4. sync 包:Mutex、RWMutex、WaitGroup、Once、Cond、Pool、Map
  5. atomic 包:原子操作,用于无锁并发编程
  6. Context:传递取消信号、超时和请求范围的值
  7. 并发模式:Pipeline、Fan-out/Fan-in、Worker Pool、生产者-消费者
  8. 常见问题:死锁、泄露、竞态条件

练习

  1. 创建一个并发的下载器,使用 Worker Pool 模式限制并发数
  2. 使用 Pipeline 模式实现数据处理管道(过滤 -> 转换 -> 汇总)
  3. 使用 Context 实现一个带超时的 HTTP 请求客户端
  4. 实现"哲学家就餐"问题,避免死锁
  5. 使用 sync.Map 实现一个并发安全的缓存系统