Go 并发编程
本章将介绍 Go 语言强大的并发编程能力。Go 从语言层面原生支持并发,通过 goroutine 和 channel 使得并发编程变得简单而高效。
并发与并行
在开始学习 Go 的并发特性之前,我们需要理解两个重要概念:
- 并发(Concurrency):是指同时处理多个任务的能力。在单核 CPU 上,通过时间片轮转的方式交替执行多个任务,从宏观上看像是同时进行的。
- 并行(Parallelism):是指同时执行多个任务的能力。需要多核 CPU 支持,每个核心真正同时执行不同的任务。
并发关注的是结构,即如何组织程序来处理多个任务;并行关注的是执行,即如何同时运行多个任务。Go 的并发模型可以让程序在单核上实现并发,在多核上实现并行。
Go 的并发模型
Go 采用了 CSP(Communicating Sequential Processes,通信顺序进程) 模型,核心理念是:
"不要通过共享内存来通信,而要通过通信来共享内存。"
这意味着在 Go 中,goroutine 之间通过 channel 传递数据来通信,而不是通过共享变量并加锁来同步。
Go 调度器:GMP 模型
Go 运行时实现了自己的调度器,称为 GMP 模型:
- G(Goroutine):用户态的轻量级线程,初始栈大小只有 2KB
- M(Machine):操作系统线程,真正执行代码的载体
- P(Processor):逻辑处理器,负责管理和调度 G
调度过程:
- 每个 P 维护一个可运行的 G 队列(本地队列)
- M 绑定到 P 后,从 P 的队列中获取 G 执行
- 当 G 阻塞(如系统调用)时,M 会释放 P,让其他 M 绑定到这个 P
- 全局队列存放等待执行的 G,P 会定期从全局队列获取 G
全局队列 (Global Queue)
┌─────────────────────────────┐
│ G1 G2 G3 G4 G5 ... │
└─────────────┬───────────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│ P1 │ │ P2 │ │ P3 │
│ 本地 │ │ 本地 │ │ 本地 │
│ 队列 │ │ 队列 │ │ 队列 │
│ G6 G7 │ │ G8 G9 │ │ G10 │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│ M1 │ │ M2 │ │ M3 │
└───────┘ └───────┘ └───────┘
理解 GMP 模型有助于:
- 合理设置
GOMAXPROCS(默认为 CPU 核数) - 理解 goroutine 为什么如此轻量
- 分析并发程序的性能问题
Goroutine
Goroutine 是 Go 中轻量级的并发执行单元。它比操作系统线程更轻量,初始栈大小只有 2KB(可以动态伸缩),而线程通常需要 1MB 以上的栈空间。
创建 Goroutine
使用 go 关键字启动一个 goroutine:
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine!")
}
func main() {
// 启动 goroutine
go sayHello()
fmt.Println("Hello from main!")
// 等待 goroutine 执行完成
time.Sleep(time.Second)
}
输出顺序可能为:
Hello from main!
Hello from goroutine!
main 函数本身也是一个 goroutine。当 main 函数返回时,所有其他 goroutine 都会被立即终止,不管它们是否执行完成。这就是为什么上面代码需要 time.Sleep 来等待 goroutine 完成。
匿名函数 Goroutine
可以使用匿名函数创建 goroutine:
func main() {
// 方式一:立即执行的匿名函数
go func() {
fmt.Println("匿名 goroutine")
}()
// 方式二:带参数的匿名函数
message := "Hello"
go func(msg string) {
fmt.Println(msg)
}(message)
time.Sleep(time.Second)
}
闭包变量捕获问题
在使用闭包时需要注意变量捕获的问题:
// ❌ 错误示例:所有 goroutine 可能都打印 3
for i := 0; i < 3; i++ {
go func() {
fmt.Println(i) // 捕获的是变量 i 的引用
}()
}
// ✅ 正确示例:将 i 作为参数传递
for i := 0; i < 3; i++ {
go func(n int) {
fmt.Println(n) // 每个 goroutine 有自己的 n 副本
}(i)
}
使用 sync.WaitGroup 等待
使用 time.Sleep 等待 goroutine 完成是不推荐的,应该使用 sync.WaitGroup:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
// 延迟调用 Done,确保函数退出时通知 WaitGroup
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
// 启动 5 个 goroutine
for i := 1; i <= 5; i++ {
// 在启动 goroutine 前增加计数器
wg.Add(1)
go worker(i, &wg)
}
// 等待所有 goroutine 完成
wg.Wait()
fmt.Println("All workers done")
}
WaitGroup 的三个方法:
Add(delta int):增加计数器,通常在启动 goroutine 前调用Done():减少计数器,等价于Add(-1)Wait():阻塞直到计数器归零
wg.Add(1) 应该在启动 goroutine 之前调用,而不是在 goroutine 内部调用,以避免竞态条件。
Channel
Channel(通道)是 goroutine 之间通信的管道。通过 channel,一个 goroutine 可以发送值给另一个 goroutine。
创建 Channel
使用 make 创建 channel:
// 无缓冲 channel
ch := make(chan int)
// 有缓冲 channel,容量为 10
ch := make(chan int, 10)
发送和接收
使用 <- 操作符发送和接收数据:
ch := make(chan int)
// 发送数据
ch <- 42
// 接收数据
value := <-ch
// 接收数据并检查 channel 是否已关闭
value, ok := <-ch
// 如果 ok 为 false,表示 channel 已关闭且没有更多数据
无缓冲 Channel
无缓冲 channel 也称为同步 channel,发送操作会阻塞直到另一个 goroutine 执行接收操作:
func main() {
ch := make(chan string)
// 发送 goroutine
go func() {
ch <- "Hello" // 阻塞,直到有人接收
fmt.Println("消息已发送")
}()
// 接收
msg := <-ch // 阻塞,直到有数据可接收
fmt.Println(msg)
}
特点:
- 发送和接收操作是同步的
- 每次发送必须有对应的接收,否则会死锁
- 适合用于 goroutine 间的同步
有缓冲 Channel
有缓冲 channel 有一个固定大小的缓冲区,发送操作只有在缓冲区满时才会阻塞:
func main() {
// 创建容量为 2 的缓冲 channel
ch := make(chan int, 2)
// 可以连续发送两个值而不阻塞
ch <- 1
ch <- 2
// 第三个发送会阻塞(缓冲区已满)
// ch <- 3 // 死锁!
// 接收数据
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
}
特点:
- 发送操作在缓冲区未满时不会阻塞
- 接收操作在缓冲区不为空时不会阻塞
- 适合用于生产者-消费者模式
关闭 Channel
使用 close 函数关闭 channel:
ch := make(chan int, 3)
ch <- 1
ch <- 2
close(ch) // 关闭 channel
// 从已关闭的 channel 接收
value, ok := <-ch
fmt.Println(value, ok) // 1 true
value, ok = <-ch
fmt.Println(value, ok) // 2 true
value, ok = <-ch
fmt.Println(value, ok) // 0 false(channel 已空且关闭)
关闭 channel 的规则:
- 只有发送者应该关闭 channel,接收者不应该关闭
- 向已关闭的 channel 发送数据会 panic
- 从已关闭的 channel 接收会返回零值和
false - 关闭一个已关闭的 channel 会 panic
- 关闭 nil channel 会 panic
使用 range 遍历 Channel
可以使用 for range 持续从 channel 接收数据,直到 channel 关闭:
func main() {
ch := make(chan int, 5)
// 发送数据
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 发送完毕后关闭
}()
// 接收数据直到 channel 关闭
for value := range ch {
fmt.Println(value)
}
fmt.Println("Channel 已关闭")
}
单向 Channel
Go 支持单向 channel 类型,用于限制 channel 的操作方向:
// 只发送 channel
func sender(ch chan<- int) {
ch <- 42
// <-ch // 编译错误!不能从只发送 channel 接收
}
// 只接收 channel
func receiver(ch <-chan int) {
value := <-ch
fmt.Println(value)
// ch <- 1 // 编译错误!不能向只接收 channel 发送
}
func main() {
ch := make(chan int)
go sender(ch)
go receiver(ch)
}
类型转换:
ch := make(chan int) // 双向 channel
var sendCh chan<- int = ch // 转换为只发送
var recvCh <-chan int = ch // 转换为只接收
Select
select 语句用于同时等待多个 channel 操作,类似于网络编程中的 select/poll/epoll。
基本用法
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(time.Second)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(500 * time.Millisecond)
ch2 <- "from ch2"
}()
// 等待最先到达的数据
select {
case msg1 := <-ch1:
fmt.Println("Received", msg1)
case msg2 := <-ch2:
fmt.Println("Received", msg2)
}
}
超时处理
使用 time.After 实现超时:
func main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case res := <-ch:
fmt.Println(res)
case <-time.After(time.Second):
fmt.Println("Timeout!")
}
}
非阻塞操作
使用 default 实现非阻塞操作:
func main() {
ch := make(chan int, 1)
select {
case ch <- 42:
fmt.Println("发送成功")
default:
fmt.Println("发送失败(channel 满)")
}
select {
case value := <-ch:
fmt.Println("接收:", value)
default:
fmt.Println("接收失败(channel 空)")
}
}
检查 Channel 是否已关闭
func main() {
ch := make(chan int, 1)
close(ch)
select {
case value, ok := <-ch:
if ok {
fmt.Println("接收到:", value)
} else {
fmt.Println("Channel 已关闭")
}
default:
fmt.Println("没有数据")
}
}
并发安全
当多个 goroutine 同时访问共享资源时,需要使用同步机制来保证数据安全。
sync.Mutex
互斥锁用于保护临界区,同一时间只有一个 goroutine 可以访问:
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock() // 获取锁
defer c.mu.Unlock() // 释放锁
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := Counter{}
var wg sync.WaitGroup
// 启动 1000 个 goroutine
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Counter:", counter.Value()) // 1000
}
sync.RWMutex
读写锁允许多个读者同时读取,但写者需要独占访问:
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) string {
c.mu.RLock() // 读锁
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
cache := &Cache{
data: make(map[string]string),
}
// 写操作
go func() {
cache.Set("key", "value")
}()
// 多个读操作可以同时进行
for i := 0; i < 3; i++ {
go func() {
fmt.Println(cache.Get("key"))
}()
}
}
读写锁的规则:
- 读锁可以同时被多个 goroutine 获取
- 写锁是独占的,只能被一个 goroutine 获取
- 如果有写锁存在,读锁会被阻塞
- 如果有读锁存在,写锁会被阻塞
sync/atomic
Go 提供了原子操作包,用于对基本类型进行无锁并发操作:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // 原子加操作
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // 1000
// 比较并交换(Compare And Swap)
old := atomic.LoadInt64(&counter)
swapped := atomic.CompareAndSwapInt64(&counter, old, 0)
fmt.Println("Swapped:", swapped) // true
fmt.Println("Counter:", counter) // 0
}
常用的原子操作:
Add:原子加法Load:原子读取Store:原子存储Swap:原子交换CompareAndSwap:比较并交换
atomic.Int64(Go 1.19+)
Go 1.19 引入了新的原子类型,使用更加方便:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Add(1)
}()
}
wg.Wait()
fmt.Println("Counter:", counter.Load()) // 1000
// 原子交换
old := counter.Swap(0)
fmt.Println("Old value:", old) // 1000
fmt.Println("New value:", counter.Load()) // 0
// 比较并交换
swapped := counter.CompareAndSwap(0, 42)
fmt.Println("Swapped:", swapped) // true
fmt.Println("Value:", counter.Load()) // 42
}
Context
Context 包用于在 API 边界和进程之间传递截止时间、取消信号和其他请求范围的值。
创建 Context
import "context"
// 背景 context,作为根 context
ctx := context.Background()
// TODO context,用于不确定使用哪个 context 时
ctx := context.TODO()
取消信号
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Worker stopped:", ctx.Err())
return
default:
fmt.Println("Working...")
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())
go worker(ctx)
time.Sleep(2 * time.Second)
cancel() // 取消 context
time.Sleep(time.Second) // 等待 worker 退出
}
超时控制
func main() {
// 创建带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() // 释放资源
select {
case <-time.After(2 * time.Second):
fmt.Println("操作完成")
case <-ctx.Done():
fmt.Println("超时:", ctx.Err()) // context deadline exceeded
}
}
截止时间
func main() {
// 设置截止时间
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
// 检查截止时间
d, ok := ctx.Deadline()
fmt.Println("Deadline:", d, "set:", ok)
select {
case <-ctx.Done():
fmt.Println("已到达截止时间")
}
}
传递值
func main() {
// 创建带值的 context
ctx := context.WithValue(context.Background(), "userID", 12345)
ctx = context.WithValue(ctx, "requestID", "abc-123")
// 获取值
if userID, ok := ctx.Value("userID").(int); ok {
fmt.Println("UserID:", userID)
}
if requestID, ok := ctx.Value("requestID").(string); ok {
fmt.Println("RequestID:", requestID)
}
}
Context 的 Value 应该用于传递请求范围的数据,如 trace ID、用户 ID 等,不要用于传递可选参数或业务数据。
HTTP 请求中的 Context
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", "https://example.com", nil)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
fmt.Println("Status:", resp.Status)
}
并发模式
Pipeline 模式
Pipeline 是一系列通过 channel 连接的处理阶段:
package main
import "fmt"
// 第一阶段:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 第二阶段:处理数据(平方)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// 构建 pipeline
c := generate(2, 3)
out := square(c)
// 消费结果
for n := range out {
fmt.Println(n) // 4, 9
}
// 可以链式调用
for n := range square(square(generate(2, 3))) {
fmt.Println(n) // 16, 81
}
}
Fan-out / Fan-in 模式
Fan-out:多个 goroutine 从同一个 channel 读取数据,实现并行处理。 Fan-in:将多个 channel 的结果合并到一个 channel。
package main
import (
"fmt"
"sync"
)
// Fan-out:启动多个 worker 处理同一个输入 channel
func worker(id int, in <-chan int, out chan<- int) {
for n := range in {
fmt.Printf("Worker %d: processing %d\n", id, n)
out <- n * n
}
}
// Fan-in:合并多个 channel 到一个 channel
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个输入 channel 启动一个 goroutine
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// 等待所有输入 channel 关闭后,关闭输出 channel
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// 输入 channel
in := make(chan int, 10)
// 启动 3 个 worker
out1 := make(chan int)
out2 := make(chan int)
out3 := make(chan int)
go worker(1, in, out1)
go worker(2, in, out2)
go worker(3, in, out3)
// 发送数据
for i := 0; i < 10; i++ {
in <- i
}
close(in)
// 关闭输出 channel
go func() {
// 等待一段时间让 worker 处理完
time.Sleep(time.Second)
close(out1)
close(out2)
close(out3)
}()
// 合并结果
for n := range merge(out1, out2, out3) {
fmt.Println(n)
}
}
Worker Pool 模式
创建固定数量的 worker 来处理任务:
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
Job Job
Result string
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d: processing job %d\n", id, job.ID)
time.Sleep(time.Second) // 模拟处理
results <- Result{
Job: job,
Result: fmt.Sprintf("Processed: %s", job.Data),
}
}
}
func main() {
const numWorkers = 3
const numJobs = 10
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// 启动 worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("Task %d", i)}
}
close(jobs)
// 等待所有 worker 完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("Result: Job %d -> %s\n", result.Job.ID, result.Result)
}
}
生产者-消费者模式
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
num := rand.Intn(100)
ch <- num
fmt.Printf("Producer %d: produced %d\n", id, num)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
}
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for num := range ch {
fmt.Printf("Consumer %d: consumed %d\n", id, num)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}
func main() {
ch := make(chan int, 5)
var wg sync.WaitGroup
// 启动生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go producer(i, ch, &wg)
}
// 启动消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(i, ch, &wg)
}
// 等待生产者完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()
time.Sleep(5 * time.Second)
}
速率限制模式
使用 ticker 实现速率限制:
package main
import (
"fmt"
"time"
)
func main() {
requests := make(chan int, 10)
for i := 0; i < 10; i++ {
requests <- i
}
close(requests)
// 每 200ms 处理一个请求
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter // 等待 ticker
fmt.Println("Request", req, time.Now().Format("15:04:05.000"))
}
}
等待超时模式
func waitForResult() (string, error) {
result := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second) // 模拟耗时操作
result <- "success"
}()
select {
case res := <-result:
return res, nil
case <-time.After(time.Second):
return "", fmt.Errorf("timeout")
}
}
其他同步原语
sync.Once
确保某个操作只执行一次:
package main
import (
"fmt"
"sync"
)
type Singleton struct {
name string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
fmt.Println("Creating instance...")
instance = &Singleton{name: "Singleton"}
})
return instance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
inst := GetInstance()
fmt.Printf("Instance: %p\n", inst)
}()
}
wg.Wait()
// 输出:Creating instance... 只出现一次
// 所有 goroutine 获取的是同一个实例
}
sync.Cond
条件变量,用于等待某个条件成立:
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
items []int
cond *sync.Cond
}
func NewQueue() *Queue {
return &Queue{
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (q *Queue) Put(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.items = append(q.items, item)
fmt.Println("Produced:", item)
q.cond.Signal() // 通知等待的消费者
}
func (q *Queue) Get() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 等待队列不为空
for len(q.items) == 0 {
q.cond.Wait()
}
item := q.items[0]
q.items = q.items[1:]
fmt.Println("Consumed:", item)
return item
}
func main() {
queue := NewQueue()
// 消费者
go func() {
for i := 0; i < 5; i++ {
queue.Get()
}
}()
// 生产者
for i := 0; i < 5; i++ {
queue.Put(i)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(time.Second)
}
sync.Pool
对象池,用于复用临时对象,减少内存分配:
package main
import (
"fmt"
"sync"
)
var pool = sync.Pool{
New: func() interface{} {
fmt.Println("Creating new object")
return make([]byte, 1024)
},
}
func main() {
// 从池中获取对象
data := pool.Get().([]byte)
fmt.Println("Got object, len:", len(data))
// 使用对象...
// 将对象放回池中
pool.Put(data)
// 再次获取,可能复用之前的对象
data2 := pool.Get().([]byte)
fmt.Println("Got object again, len:", len(data2))
// 输出:
// Creating new object(只有第一次)
// Got object, len: 1024
// Got object again, len: 1024
}
sync.Map
sync.Map 是并发安全的 map 实现,适用于读多写少的场景。与使用 sync.RWMutex 保护的普通 map 相比,sync.Map 在特定场景下有更好的性能表现。
Go 1.24 的改进:
Go 1.24 对 sync.Map 的实现进行了重大改进,性能显著提升:
- 对于修改不相交键集合的操作,在大型 map 上竞争更少
- 不再需要"预热"时间就能达到低竞争的读取性能
- 如果遇到问题,可以通过
GOEXPERIMENT=nosynchashtriemap切换回旧实现
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// 存储
m.Store("name", "张三")
m.Store("age", 25)
// 读取
if value, ok := m.Load("name"); ok {
fmt.Println("Name:", value)
}
// 读取或存储(如果不存在则存储)
value, loaded := m.LoadOrStore("city", "北京")
fmt.Println("City:", value, "loaded:", loaded)
// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true
})
// 删除
m.Delete("age")
// 比较并删除(Go 1.20+)
m.CompareAndDelete("name", "张三")
// 比较并交换(Go 1.20+)
m.CompareAndSwap("age", 25, 30)
}
适用场景:
| 场景 | 推荐使用 | 原因 |
|---|---|---|
| 读多写少 | sync.Map | 针对此场景优化 |
| 键集合稳定 | sync.Map | 避免频繁增删键 |
| 需要并发访问 | sync.Map | 内置并发安全 |
| 频繁写入 | map + RWMutex | sync.Map 对写场景优化较少 |
| 键集合频繁变化 | map + RWMutex | 频繁增删会影响性能 |
sync.Map 与普通 map + RWMutex 的对比:
// 方式一:sync.Map
var sm sync.Map
sm.Store("key", "value")
v, ok := sm.Load("key")
// 方式二:map + RWMutex
var (
m = make(map[string]string)
mu sync.RWMutex
)
mu.Lock()
m["key"] = "value"
mu.Unlock()
mu.RLock()
v, ok := m["key"]
mu.RUnlock()
sync.Map 的优势在于它内部使用了更复杂的分片机制,在读多写少的场景下可以减少锁竞争。但在写入频繁的场景下,普通的 map 配合 RWMutex 可能更简单高效。
常见问题与陷阱
死锁
死锁是指两个或多个 goroutine 相互等待,导致都无法继续执行:
// ❌ 死锁示例 1:无缓冲 channel 只有发送没有接收
func deadlock1() {
ch := make(chan int)
ch <- 42 // 阻塞,没有接收者
}
// ❌ 死锁示例 2:循环等待
func deadlock2() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
<-ch1 // 等待 ch1
ch2 <- 1
}()
ch1 <- 1 // 等待被接收
<-ch2 // 等待 ch2
}
// ✅ 正确:使用缓冲 channel
func correct() {
ch := make(chan int, 1)
ch <- 42
fmt.Println(<-ch)
}
Goroutine 泄露
Goroutine 泄露是指 goroutine 永远不会退出,持续占用资源:
// ❌ 泄露示例:goroutine 永远阻塞
func leak() {
ch := make(chan int)
go func() {
<-ch // 永远没有人发送数据
}()
}
// ✅ 正确:使用 context 或 done channel
func noLeak() {
ch := make(chan int)
done := make(chan struct{})
go func() {
select {
case <-ch:
// 处理数据
case <-done:
return // 可以退出
}
}()
close(done) // 通知 goroutine 退出
}
竞态条件
多个 goroutine 同时访问共享数据且至少有一个是写操作:
// ❌ 竞态条件
var counter int
func raceCondition() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 竞态!
}()
}
wg.Wait()
fmt.Println(counter) // 可能不是 1000
}
// ✅ 正确:使用 mutex 或 atomic
var (
counterSafe int
mu sync.Mutex
)
func noRaceCondition() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counterSafe++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(counterSafe) // 一定是 1000
}
可以使用 go run -race 检测竞态条件。
优雅关闭模式
在生产环境中,服务需要能够优雅关闭,等待所有正在处理的请求完成后再退出。
HTTP 服务优雅关闭
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Second) // 模拟处理
fmt.Fprintln(w, "Hello")
})
server := &http.Server{
Addr: ":8080",
Handler: mux,
}
// 启动服务器
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
fmt.Printf("服务器错误: %v\n", err)
}
}()
fmt.Println("服务器启动在 :8080")
// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
fmt.Println("正在关闭服务器...")
// 优雅关闭,最多等待 30 秒
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
fmt.Printf("服务器关闭错误: %v\n", err)
}
fmt.Println("服务器已关闭")
}
Worker 优雅关闭
func gracefulWorkerPool(numWorkers int, jobs <-chan Job) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
// 启动 workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(ctx, id, jobs)
}(i)
}
// 等待关闭信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
fmt.Println("正在停止 workers...")
cancel() // 通知所有 workers 停止
// 等待所有 workers 完成当前任务
wg.Wait()
fmt.Println("所有 workers 已停止")
}
func worker(ctx context.Context, id int, jobs <-chan Job) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 收到停止信号\n", id)
return
case job, ok := <-jobs:
if !ok {
return
}
processJob(job)
}
}
}
errgroup
golang.org/x/sync/errgroup 包提供了更方便的并发控制方式,特别适合需要并行执行多个任务并在任一失败时取消的场景。
package main
import (
"context"
"fmt"
"net/http"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://invalid.url", // 会失败
}
for _, url := range urls {
url := url // 捕获变量
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
fmt.Printf("URL: %s, Status: %d\n", url, resp.StatusCode)
return nil
})
}
// 等待所有 goroutine 完成
if err := g.Wait(); err != nil {
fmt.Printf("发生错误: %v\n", err)
}
fmt.Println("所有请求完成")
}
errgroup 限制并发数
func fetchAll(ctx context.Context, urls []string) error {
g, ctx := errgroup.WithContext(ctx)
// 限制最多 5 个并发
g.SetLimit(5)
for _, url := range urls {
url := url
g.Go(func() error {
return fetchURL(ctx, url)
})
}
return g.Wait()
}
testing/synctest:并发测试(Go 1.24实验性)
Go 1.24 引入了实验性的 testing/synctest 包,专门用于测试并发代码。这个包解决了并发测试中常见的时序问题和不确定性。
为什么需要 synctest?
测试并发代码的挑战:
- 时序不确定:goroutine 的执行顺序不可预测
- 时间等待:使用
time.Sleep会让测试变慢且不稳定 - 竞态条件:测试可能因时序不同而产生不同结果
synctest 通过提供一个"气泡"环境来解决这些问题,在这个环境中:
- 时间包函数操作的是假时钟
- 可以等待所有 goroutine 阻塞
启用 synctest
由于是实验性功能,必须通过环境变量启用:
# 运行测试时启用
GOEXPERIMENT=synctest go test
# 或在构建时设置
GOEXPERIMENT=synctest go build
基本用法
synctest.Run:创建隔离的测试气泡
package main
import (
"testing"
"testing/synctest"
"time"
)
func TestConcurrentTimer(t *testing.T) {
synctest.Run(func() {
// 在这个气泡内,time 包使用假时钟
done := make(chan time.Time)
go func() {
// 这个 time.After 使用假时钟
done <- <-time.After(5 * time.Second)
}()
// 等待 goroutine 阻塞
synctest.Wait()
// 现在可以安全地验证状态
select {
case <-done:
t.Error("timer should not have fired yet")
default:
// 预期:计时器还未触发
}
})
}
synctest.Wait:等待所有 goroutine 阻塞
synctest.Wait() 会阻塞直到当前气泡内的所有 goroutine 都处于阻塞状态(等待 channel 操作、锁、time.Sleep 等)。
func TestProducerConsumer(t *testing.T) {
synctest.Run(func() {
ch := make(chan int, 1)
results := make([]int, 0)
// 生产者
go func() {
ch <- 1
ch <- 2
close(ch)
}()
// 消费者
go func() {
for v := range ch {
results = append(results, v)
}
}()
// 等待所有 goroutine 完成工作并阻塞
synctest.Wait()
// 现在可以安全检查结果
if len(results) != 2 {
t.Errorf("expected 2 results, got %d", len(results))
}
})
}
测试超时逻辑
synctest 特别适合测试超时和计时器相关的代码:
func TestTimeout(t *testing.T) {
synctest.Run(func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
defer close(done)
select {
case <-ctx.Done():
// 超时处理
case <-time.After(2 * time.Second):
t.Error("should have timed out first")
}
}()
// 等待 goroutine 阻塞
synctest.Wait()
// 验证 context 已过期
if ctx.Err() != context.DeadlineExceeded {
t.Errorf("expected DeadlineExceeded, got %v", ctx.Err())
}
})
}
测试重试逻辑
func TestRetryWithBackoff(t *testing.T) {
synctest.Run(func() {
attempts := 0
err := retryWithBackoff(3, func() error {
attempts++
if attempts < 3 {
return errors.New("not yet")
}
return nil
})
if err != nil {
t.Errorf("expected success, got %v", err)
}
if attempts != 3 {
t.Errorf("expected 3 attempts, got %d", attempts)
}
})
}
func retryWithBackoff(maxAttempts int, fn func() error) error {
backoff := 100 * time.Millisecond
for i := 0; i < maxAttempts; i++ {
if err := fn(); err == nil {
return nil
}
if i < maxAttempts-1 {
time.Sleep(backoff)
backoff *= 2
}
}
return errors.New("max attempts exceeded")
}
完整示例:测试并发缓存
package cache
import (
"testing"
"testing/synctest"
"time"
)
type Cache struct {
data map[string]string
fresh map[string]time.Time
ttl time.Duration
}
func NewCache(ttl time.Duration) *Cache {
return &Cache{
data: make(map[string]string),
fresh: make(map[string]time.Time),
ttl: ttl,
}
}
func (c *Cache) Get(key string) (string, bool) {
if freshTime, ok := c.fresh[key]; ok {
if time.Since(freshTime) > c.ttl {
return "", false // 已过期
}
}
v, ok := c.data[key]
return v, ok
}
func (c *Cache) Set(key, value string) {
c.data[key] = value
c.fresh[key] = time.Now()
}
func TestCacheExpiration(t *testing.T) {
synctest.Run(func() {
cache := NewCache(1 * time.Second)
// 设置缓存
cache.Set("key", "value")
// 立即获取应该成功
if v, ok := cache.Get("key"); !ok || v != "value" {
t.Error("expected to get value immediately")
}
// 等待 TTL 过期
time.Sleep(2 * time.Second)
synctest.Wait()
// 过期后应该获取失败
if _, ok := cache.Get("key"); ok {
t.Error("expected cache to be expired")
}
})
}
注意事项
- 实验性:API 可能会在未来的 Go 版本中变化
- 构建标签:必须通过
-tags=synctest或GOEXPERIMENT=synctest启用 - 气泡隔离:每个
synctest.Run创建独立的气泡,气泡间的 goroutine 互不影响 - 时间操作:气泡内所有
time包函数使用假时钟
与传统方法的对比
| 方法 | 优点 | 缺点 |
|---|---|---|
| time.Sleep | 简单 | 测试慢、不稳定 |
| mock 时间 | 灵活 | 需要额外代码、复杂 |
| synctest | 原生支持、快速、确定 | 实验性、API 可能变化 |
最佳实践
1. 使用 defer 关闭资源
func processFile(filename string) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close() // 确保文件关闭
// 处理文件...
return nil
}
2. 正确处理 goroutine 启动
// ❌ 错误:在 goroutine 内部调用 wg.Add
go func() {
wg.Add(1) // 可能在 wg.Wait() 之后才执行
defer wg.Done()
// ...
}()
// ✅ 正确:在启动 goroutine 前调用 wg.Add
wg.Add(1)
go func() {
defer wg.Done()
// ...
}()
3. 使用 context 进行取消
func operation(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// 执行操作
}
return nil
}
4. 避免 goroutine 泄露
func worker(ctx context.Context, jobs <-chan Job) {
for {
select {
case job, ok := <-jobs:
if !ok {
return // channel 已关闭
}
process(job)
case <-ctx.Done():
return // 收到取消信号
}
}
}
5. 合理使用缓冲 channel
// 已知数量的数据:使用适当大小的缓冲
ch := make(chan int, 100)
// 未知数量的数据:使用无缓冲或小缓冲
ch := make(chan int) // 或 make(chan int, 1)
6. 使用 panic 保护
func safeGo(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("goroutine panic: %v\n%s\n", r, debug.Stack())
}
}()
fn()
}()
}
// 使用
safeGo(func() {
// 可能 panic 的代码
panic("oops")
})
// 程序不会崩溃
小结
本章我们学习了:
- Goroutine:轻量级并发执行单元,使用
go关键字启动 - Channel:goroutine 之间的通信机制,分为有缓冲和无缓冲
- Select:多路复用,同时监听多个 channel
- sync 包:Mutex、RWMutex、WaitGroup、Once、Cond、Pool、Map
- atomic 包:原子操作,用于无锁并发编程
- Context:传递取消信号、超时和请求范围的值
- 并发模式:Pipeline、Fan-out/Fan-in、Worker Pool、生产者-消费者
- 常见问题:死锁、泄露、竞态条件
练习
- 创建一个并发的下载器,使用 Worker Pool 模式限制并发数
- 使用 Pipeline 模式实现数据处理管道(过滤 -> 转换 -> 汇总)
- 使用 Context 实现一个带超时的 HTTP 请求客户端
- 实现"哲学家就餐"问题,避免死锁
- 使用 sync.Map 实现一个并发安全的缓存系统