跳到主要内容

Go 并发编程

本章将介绍 Go 语言强大的并发编程能力。Go 从语言层面原生支持并发,通过 goroutine 和 channel 使得并发编程变得简单而高效。

并发与并行

在开始学习 Go 的并发特性之前,我们需要理解两个重要概念:

  • 并发(Concurrency):是指同时处理多个任务的能力。在单核 CPU 上,通过时间片轮转的方式交替执行多个任务,从宏观上看像是同时进行的。
  • 并行(Parallelism):是指同时执行多个任务的能力。需要多核 CPU 支持,每个核心真正同时执行不同的任务。
关键区别

并发关注的是结构,即如何组织程序来处理多个任务;并行关注的是执行,即如何同时运行多个任务。Go 的并发模型可以让程序在单核上实现并发,在多核上实现并行。

Go 的并发模型

Go 采用了 CSP(Communicating Sequential Processes,通信顺序进程) 模型,核心理念是:

"不要通过共享内存来通信,而要通过通信来共享内存。"

这意味着在 Go 中,goroutine 之间通过 channel 传递数据来通信,而不是通过共享变量并加锁来同步。

Goroutine

Goroutine 是 Go 中轻量级的并发执行单元。它比操作系统线程更轻量,初始栈大小只有 2KB(可以动态伸缩),而线程通常需要 1MB 以上的栈空间。

创建 Goroutine

使用 go 关键字启动一个 goroutine:

package main

import (
"fmt"
"time"
)

func sayHello() {
fmt.Println("Hello from goroutine!")
}

func main() {
// 启动 goroutine
go sayHello()

fmt.Println("Hello from main!")

// 等待 goroutine 执行完成
time.Sleep(time.Second)
}

输出顺序可能为:

Hello from main!
Hello from goroutine!
注意

main 函数本身也是一个 goroutine。当 main 函数返回时,所有其他 goroutine 都会被立即终止,不管它们是否执行完成。这就是为什么上面代码需要 time.Sleep 来等待 goroutine 完成。

匿名函数 Goroutine

可以使用匿名函数创建 goroutine:

func main() {
// 方式一:立即执行的匿名函数
go func() {
fmt.Println("匿名 goroutine")
}()

// 方式二:带参数的匿名函数
message := "Hello"
go func(msg string) {
fmt.Println(msg)
}(message)

time.Sleep(time.Second)
}

闭包变量捕获问题

在使用闭包时需要注意变量捕获的问题:

// ❌ 错误示例:所有 goroutine 可能都打印 3
for i := 0; i < 3; i++ {
go func() {
fmt.Println(i) // 捕获的是变量 i 的引用
}()
}

// ✅ 正确示例:将 i 作为参数传递
for i := 0; i < 3; i++ {
go func(n int) {
fmt.Println(n) // 每个 goroutine 有自己的 n 副本
}(i)
}

使用 sync.WaitGroup 等待

使用 time.Sleep 等待 goroutine 完成是不推荐的,应该使用 sync.WaitGroup

package main

import (
"fmt"
"sync"
"time"
)

func worker(id int, wg *sync.WaitGroup) {
// 延迟调用 Done,确保函数退出时通知 WaitGroup
defer wg.Done()

fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}

func main() {
var wg sync.WaitGroup

// 启动 5 个 goroutine
for i := 1; i <= 5; i++ {
// 在启动 goroutine 前增加计数器
wg.Add(1)
go worker(i, &wg)
}

// 等待所有 goroutine 完成
wg.Wait()
fmt.Println("All workers done")
}

WaitGroup 的三个方法:

  • Add(delta int):增加计数器,通常在启动 goroutine 前调用
  • Done():减少计数器,等价于 Add(-1)
  • Wait():阻塞直到计数器归零
最佳实践

wg.Add(1) 应该在启动 goroutine 之前调用,而不是在 goroutine 内部调用,以避免竞态条件。

Channel

Channel(通道)是 goroutine 之间通信的管道。通过 channel,一个 goroutine 可以发送值给另一个 goroutine。

创建 Channel

使用 make 创建 channel:

// 无缓冲 channel
ch := make(chan int)

// 有缓冲 channel,容量为 10
ch := make(chan int, 10)

发送和接收

使用 <- 操作符发送和接收数据:

ch := make(chan int)

// 发送数据
ch <- 42

// 接收数据
value := <-ch

// 接收数据并检查 channel 是否已关闭
value, ok := <-ch
// 如果 ok 为 false,表示 channel 已关闭且没有更多数据

无缓冲 Channel

无缓冲 channel 也称为同步 channel,发送操作会阻塞直到另一个 goroutine 执行接收操作:

func main() {
ch := make(chan string)

// 发送 goroutine
go func() {
ch <- "Hello" // 阻塞,直到有人接收
fmt.Println("消息已发送")
}()

// 接收
msg := <-ch // 阻塞,直到有数据可接收
fmt.Println(msg)
}

特点:

  • 发送和接收操作是同步的
  • 每次发送必须有对应的接收,否则会死锁
  • 适合用于 goroutine 间的同步

有缓冲 Channel

有缓冲 channel 有一个固定大小的缓冲区,发送操作只有在缓冲区满时才会阻塞:

func main() {
// 创建容量为 2 的缓冲 channel
ch := make(chan int, 2)

// 可以连续发送两个值而不阻塞
ch <- 1
ch <- 2

// 第三个发送会阻塞(缓冲区已满)
// ch <- 3 // 死锁!

// 接收数据
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
}

特点:

  • 发送操作在缓冲区未满时不会阻塞
  • 接收操作在缓冲区不为空时不会阻塞
  • 适合用于生产者-消费者模式

关闭 Channel

使用 close 函数关闭 channel:

ch := make(chan int, 3)

ch <- 1
ch <- 2
close(ch) // 关闭 channel

// 从已关闭的 channel 接收
value, ok := <-ch
fmt.Println(value, ok) // 1 true

value, ok = <-ch
fmt.Println(value, ok) // 2 true

value, ok = <-ch
fmt.Println(value, ok) // 0 false(channel 已空且关闭)

关闭 channel 的规则:

  1. 只有发送者应该关闭 channel,接收者不应该关闭
  2. 向已关闭的 channel 发送数据会 panic
  3. 从已关闭的 channel 接收会返回零值和 false
  4. 关闭一个已关闭的 channel 会 panic
  5. 关闭 nil channel 会 panic

使用 range 遍历 Channel

可以使用 for range 持续从 channel 接收数据,直到 channel 关闭:

func main() {
ch := make(chan int, 5)

// 发送数据
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 发送完毕后关闭
}()

// 接收数据直到 channel 关闭
for value := range ch {
fmt.Println(value)
}

fmt.Println("Channel 已关闭")
}

单向 Channel

Go 支持单向 channel 类型,用于限制 channel 的操作方向:

// 只发送 channel
func sender(ch chan<- int) {
ch <- 42
// <-ch // 编译错误!不能从只发送 channel 接收
}

// 只接收 channel
func receiver(ch <-chan int) {
value := <-ch
fmt.Println(value)
// ch <- 1 // 编译错误!不能向只接收 channel 发送
}

func main() {
ch := make(chan int)
go sender(ch)
go receiver(ch)
}

类型转换:

ch := make(chan int)        // 双向 channel
var sendCh chan<- int = ch // 转换为只发送
var recvCh <-chan int = ch // 转换为只接收

Select

select 语句用于同时等待多个 channel 操作,类似于网络编程中的 select/poll/epoll

基本用法

func main() {
ch1 := make(chan string)
ch2 := make(chan string)

go func() {
time.Sleep(time.Second)
ch1 <- "from ch1"
}()

go func() {
time.Sleep(500 * time.Millisecond)
ch2 <- "from ch2"
}()

// 等待最先到达的数据
select {
case msg1 := <-ch1:
fmt.Println("Received", msg1)
case msg2 := <-ch2:
fmt.Println("Received", msg2)
}
}

超时处理

使用 time.After 实现超时:

func main() {
ch := make(chan string)

go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()

select {
case res := <-ch:
fmt.Println(res)
case <-time.After(time.Second):
fmt.Println("Timeout!")
}
}

非阻塞操作

使用 default 实现非阻塞操作:

func main() {
ch := make(chan int, 1)

select {
case ch <- 42:
fmt.Println("发送成功")
default:
fmt.Println("发送失败(channel 满)")
}

select {
case value := <-ch:
fmt.Println("接收:", value)
default:
fmt.Println("接收失败(channel 空)")
}
}

检查 Channel 是否已关闭

func main() {
ch := make(chan int, 1)
close(ch)

select {
case value, ok := <-ch:
if ok {
fmt.Println("接收到:", value)
} else {
fmt.Println("Channel 已关闭")
}
default:
fmt.Println("没有数据")
}
}

并发安全

当多个 goroutine 同时访问共享资源时,需要使用同步机制来保证数据安全。

sync.Mutex

互斥锁用于保护临界区,同一时间只有一个 goroutine 可以访问:

package main

import (
"fmt"
"sync"
)

type Counter struct {
mu sync.Mutex
value int
}

func (c *Counter) Increment() {
c.mu.Lock() // 获取锁
defer c.mu.Unlock() // 释放锁
c.value++
}

func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}

func main() {
counter := Counter{}
var wg sync.WaitGroup

// 启动 1000 个 goroutine
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}

wg.Wait()
fmt.Println("Counter:", counter.Value()) // 1000
}

sync.RWMutex

读写锁允许多个读者同时读取,但写者需要独占访问:

package main

import (
"fmt"
"sync"
"time"
)

type Cache struct {
mu sync.RWMutex
data map[string]string
}

func (c *Cache) Get(key string) string {
c.mu.RLock() // 读锁
defer c.mu.RUnlock()
return c.data[key]
}

func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁
defer c.mu.Unlock()
c.data[key] = value
}

func main() {
cache := &Cache{
data: make(map[string]string),
}

// 写操作
go func() {
cache.Set("key", "value")
}()

// 多个读操作可以同时进行
for i := 0; i < 3; i++ {
go func() {
fmt.Println(cache.Get("key"))
}()
}
}

读写锁的规则:

  • 读锁可以同时被多个 goroutine 获取
  • 写锁是独占的,只能被一个 goroutine 获取
  • 如果有写锁存在,读锁会被阻塞
  • 如果有读锁存在,写锁会被阻塞

sync/atomic

Go 提供了原子操作包,用于对基本类型进行无锁并发操作:

package main

import (
"fmt"
"sync"
"sync/atomic"
)

func main() {
var counter int64
var wg sync.WaitGroup

for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // 原子加操作
}()
}

wg.Wait()
fmt.Println("Counter:", counter) // 1000

// 比较并交换(Compare And Swap)
old := atomic.LoadInt64(&counter)
swapped := atomic.CompareAndSwapInt64(&counter, old, 0)
fmt.Println("Swapped:", swapped) // true
fmt.Println("Counter:", counter) // 0
}

常用的原子操作:

  • Add:原子加法
  • Load:原子读取
  • Store:原子存储
  • Swap:原子交换
  • CompareAndSwap:比较并交换

atomic.Int64(Go 1.19+)

Go 1.19 引入了新的原子类型,使用更加方便:

package main

import (
"fmt"
"sync"
"sync/atomic"
)

func main() {
var counter atomic.Int64
var wg sync.WaitGroup

for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Add(1)
}()
}

wg.Wait()
fmt.Println("Counter:", counter.Load()) // 1000

// 原子交换
old := counter.Swap(0)
fmt.Println("Old value:", old) // 1000
fmt.Println("New value:", counter.Load()) // 0

// 比较并交换
swapped := counter.CompareAndSwap(0, 42)
fmt.Println("Swapped:", swapped) // true
fmt.Println("Value:", counter.Load()) // 42
}

Context

Context 包用于在 API 边界和进程之间传递截止时间、取消信号和其他请求范围的值。

创建 Context

import "context"

// 背景 context,作为根 context
ctx := context.Background()

// TODO context,用于不确定使用哪个 context 时
ctx := context.TODO()

取消信号

package main

import (
"context"
"fmt"
"time"
)

func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Worker stopped:", ctx.Err())
return
default:
fmt.Println("Working...")
time.Sleep(500 * time.Millisecond)
}
}
}

func main() {
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())

go worker(ctx)

time.Sleep(2 * time.Second)
cancel() // 取消 context

time.Sleep(time.Second) // 等待 worker 退出
}

超时控制

func main() {
// 创建带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() // 释放资源

select {
case <-time.After(2 * time.Second):
fmt.Println("操作完成")
case <-ctx.Done():
fmt.Println("超时:", ctx.Err()) // context deadline exceeded
}
}

截止时间

func main() {
// 设置截止时间
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

// 检查截止时间
d, ok := ctx.Deadline()
fmt.Println("Deadline:", d, "set:", ok)

select {
case <-ctx.Done():
fmt.Println("已到达截止时间")
}
}

传递值

func main() {
// 创建带值的 context
ctx := context.WithValue(context.Background(), "userID", 12345)
ctx = context.WithValue(ctx, "requestID", "abc-123")

// 获取值
if userID, ok := ctx.Value("userID").(int); ok {
fmt.Println("UserID:", userID)
}

if requestID, ok := ctx.Value("requestID").(string); ok {
fmt.Println("RequestID:", requestID)
}
}
注意

Context 的 Value 应该用于传递请求范围的数据,如 trace ID、用户 ID 等,不要用于传递可选参数或业务数据。

HTTP 请求中的 Context

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, "GET", "https://example.com", nil)
if err != nil {
log.Fatal(err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()

fmt.Println("Status:", resp.Status)
}

并发模式

Pipeline 模式

Pipeline 是一系列通过 channel 连接的处理阶段:

package main

import "fmt"

// 第一阶段:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

// 第二阶段:处理数据(平方)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

func main() {
// 构建 pipeline
c := generate(2, 3)
out := square(c)

// 消费结果
for n := range out {
fmt.Println(n) // 4, 9
}

// 可以链式调用
for n := range square(square(generate(2, 3))) {
fmt.Println(n) // 16, 81
}
}

Fan-out / Fan-in 模式

Fan-out:多个 goroutine 从同一个 channel 读取数据,实现并行处理。 Fan-in:将多个 channel 的结果合并到一个 channel。

package main

import (
"fmt"
"sync"
)

// Fan-out:启动多个 worker 处理同一个输入 channel
func worker(id int, in <-chan int, out chan<- int) {
for n := range in {
fmt.Printf("Worker %d: processing %d\n", id, n)
out <- n * n
}
}

// Fan-in:合并多个 channel 到一个 channel
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// 为每个输入 channel 启动一个 goroutine
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}

wg.Add(len(channels))
for _, c := range channels {
go output(c)
}

// 等待所有输入 channel 关闭后,关闭输出 channel
go func() {
wg.Wait()
close(out)
}()

return out
}

func main() {
// 输入 channel
in := make(chan int, 10)

// 启动 3 个 worker
out1 := make(chan int)
out2 := make(chan int)
out3 := make(chan int)

go worker(1, in, out1)
go worker(2, in, out2)
go worker(3, in, out3)

// 发送数据
for i := 0; i < 10; i++ {
in <- i
}
close(in)

// 关闭输出 channel
go func() {
// 等待一段时间让 worker 处理完
time.Sleep(time.Second)
close(out1)
close(out2)
close(out3)
}()

// 合并结果
for n := range merge(out1, out2, out3) {
fmt.Println(n)
}
}

Worker Pool 模式

创建固定数量的 worker 来处理任务:

package main

import (
"fmt"
"sync"
"time"
)

type Job struct {
ID int
Data string
}

type Result struct {
Job Job
Result string
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d: processing job %d\n", id, job.ID)
time.Sleep(time.Second) // 模拟处理
results <- Result{
Job: job,
Result: fmt.Sprintf("Processed: %s", job.Data),
}
}
}

func main() {
const numWorkers = 3
const numJobs = 10

jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)

var wg sync.WaitGroup

// 启动 worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}

// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("Task %d", i)}
}
close(jobs)

// 等待所有 worker 完成
go func() {
wg.Wait()
close(results)
}()

// 收集结果
for result := range results {
fmt.Printf("Result: Job %d -> %s\n", result.Job.ID, result.Result)
}
}

生产者-消费者模式

package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
num := rand.Intn(100)
ch <- num
fmt.Printf("Producer %d: produced %d\n", id, num)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
}

func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for num := range ch {
fmt.Printf("Consumer %d: consumed %d\n", id, num)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}

func main() {
ch := make(chan int, 5)
var wg sync.WaitGroup

// 启动生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go producer(i, ch, &wg)
}

// 启动消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(i, ch, &wg)
}

// 等待生产者完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()

time.Sleep(5 * time.Second)
}

速率限制模式

使用 ticker 实现速率限制:

package main

import (
"fmt"
"time"
)

func main() {
requests := make(chan int, 10)
for i := 0; i < 10; i++ {
requests <- i
}
close(requests)

// 每 200ms 处理一个请求
limiter := time.Tick(200 * time.Millisecond)

for req := range requests {
<-limiter // 等待 ticker
fmt.Println("Request", req, time.Now().Format("15:04:05.000"))
}
}

等待超时模式

func waitForResult() (string, error) {
result := make(chan string, 1)

go func() {
time.Sleep(2 * time.Second) // 模拟耗时操作
result <- "success"
}()

select {
case res := <-result:
return res, nil
case <-time.After(time.Second):
return "", fmt.Errorf("timeout")
}
}

其他同步原语

sync.Once

确保某个操作只执行一次:

package main

import (
"fmt"
"sync"
)

type Singleton struct {
name string
}

var (
instance *Singleton
once sync.Once
)

func GetInstance() *Singleton {
once.Do(func() {
fmt.Println("Creating instance...")
instance = &Singleton{name: "Singleton"}
})
return instance
}

func main() {
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
inst := GetInstance()
fmt.Printf("Instance: %p\n", inst)
}()
}

wg.Wait()
// 输出:Creating instance... 只出现一次
// 所有 goroutine 获取的是同一个实例
}

sync.Cond

条件变量,用于等待某个条件成立:

package main

import (
"fmt"
"sync"
"time"
)

type Queue struct {
items []int
cond *sync.Cond
}

func NewQueue() *Queue {
return &Queue{
cond: sync.NewCond(&sync.Mutex{}),
}
}

func (q *Queue) Put(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

q.items = append(q.items, item)
fmt.Println("Produced:", item)
q.cond.Signal() // 通知等待的消费者
}

func (q *Queue) Get() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()

// 等待队列不为空
for len(q.items) == 0 {
q.cond.Wait()
}

item := q.items[0]
q.items = q.items[1:]
fmt.Println("Consumed:", item)
return item
}

func main() {
queue := NewQueue()

// 消费者
go func() {
for i := 0; i < 5; i++ {
queue.Get()
}
}()

// 生产者
for i := 0; i < 5; i++ {
queue.Put(i)
time.Sleep(100 * time.Millisecond)
}

time.Sleep(time.Second)
}

sync.Pool

对象池,用于复用临时对象,减少内存分配:

package main

import (
"fmt"
"sync"
)

var pool = sync.Pool{
New: func() interface{} {
fmt.Println("Creating new object")
return make([]byte, 1024)
},
}

func main() {
// 从池中获取对象
data := pool.Get().([]byte)
fmt.Println("Got object, len:", len(data))

// 使用对象...

// 将对象放回池中
pool.Put(data)

// 再次获取,可能复用之前的对象
data2 := pool.Get().([]byte)
fmt.Println("Got object again, len:", len(data2))

// 输出:
// Creating new object(只有第一次)
// Got object, len: 1024
// Got object again, len: 1024
}

sync.Map

并发安全的 map,适用于读多写少的场景:

package main

import (
"fmt"
"sync"
)

func main() {
var m sync.Map

// 存储
m.Store("name", "张三")
m.Store("age", 25)

// 读取
if value, ok := m.Load("name"); ok {
fmt.Println("Name:", value)
}

// 读取或存储(如果不存在则存储)
value, loaded := m.LoadOrStore("city", "北京")
fmt.Println("City:", value, "loaded:", loaded)

// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true
})

// 删除
m.Delete("age")
}

常见问题与陷阱

死锁

死锁是指两个或多个 goroutine 相互等待,导致都无法继续执行:

// ❌ 死锁示例 1:无缓冲 channel 只有发送没有接收
func deadlock1() {
ch := make(chan int)
ch <- 42 // 阻塞,没有接收者
}

// ❌ 死锁示例 2:循环等待
func deadlock2() {
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
<-ch1 // 等待 ch1
ch2 <- 1
}()

ch1 <- 1 // 等待被接收
<-ch2 // 等待 ch2
}

// ✅ 正确:使用缓冲 channel
func correct() {
ch := make(chan int, 1)
ch <- 42
fmt.Println(<-ch)
}

Goroutine 泄露

Goroutine 泄露是指 goroutine 永远不会退出,持续占用资源:

// ❌ 泄露示例:goroutine 永远阻塞
func leak() {
ch := make(chan int)
go func() {
<-ch // 永远没有人发送数据
}()
}

// ✅ 正确:使用 context 或 done channel
func noLeak() {
ch := make(chan int)
done := make(chan struct{})

go func() {
select {
case <-ch:
// 处理数据
case <-done:
return // 可以退出
}
}()

close(done) // 通知 goroutine 退出
}

竞态条件

多个 goroutine 同时访问共享数据且至少有一个是写操作:

// ❌ 竞态条件
var counter int

func raceCondition() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 竞态!
}()
}
wg.Wait()
fmt.Println(counter) // 可能不是 1000
}

// ✅ 正确:使用 mutex 或 atomic
var (
counterSafe int
mu sync.Mutex
)

func noRaceCondition() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counterSafe++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(counterSafe) // 一定是 1000
}

可以使用 go run -race 检测竞态条件。

最佳实践

1. 使用 defer 关闭资源

func processFile(filename string) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close() // 确保文件关闭

// 处理文件...
return nil
}

2. 正确处理 goroutine 启动

// ❌ 错误:在 goroutine 内部调用 wg.Add
go func() {
wg.Add(1) // 可能在 wg.Wait() 之后才执行
defer wg.Done()
// ...
}()

// ✅ 正确:在启动 goroutine 前调用 wg.Add
wg.Add(1)
go func() {
defer wg.Done()
// ...
}()

3. 使用 context 进行取消

func operation(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// 执行操作
}
return nil
}

4. 避免 goroutine 泄露

func worker(ctx context.Context, jobs <-chan Job) {
for {
select {
case job, ok := <-jobs:
if !ok {
return // channel 已关闭
}
process(job)
case <-ctx.Done():
return // 收到取消信号
}
}
}

5. 合理使用缓冲 channel

// 已知数量的数据:使用适当大小的缓冲
ch := make(chan int, 100)

// 未知数量的数据:使用无缓冲或小缓冲
ch := make(chan int) // 或 make(chan int, 1)

小结

本章我们学习了:

  1. Goroutine:轻量级并发执行单元,使用 go 关键字启动
  2. Channel:goroutine 之间的通信机制,分为有缓冲和无缓冲
  3. Select:多路复用,同时监听多个 channel
  4. sync 包:Mutex、RWMutex、WaitGroup、Once、Cond、Pool、Map
  5. atomic 包:原子操作,用于无锁并发编程
  6. Context:传递取消信号、超时和请求范围的值
  7. 并发模式:Pipeline、Fan-out/Fan-in、Worker Pool、生产者-消费者
  8. 常见问题:死锁、泄露、竞态条件

练习

  1. 创建一个并发的下载器,使用 Worker Pool 模式限制并发数
  2. 使用 Pipeline 模式实现数据处理管道(过滤 -> 转换 -> 汇总)
  3. 使用 Context 实现一个带超时的 HTTP 请求客户端
  4. 实现"哲学家就餐"问题,避免死锁
  5. 使用 sync.Map 实现一个并发安全的缓存系统