8. Go 并发编程

Goroutine

基本概念

  1. Goroutine 是轻量级线程
  2. 由 Go 运行时管理
  3. 比线程更高效

创建和使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 启动 goroutine
go func() {
    fmt.Println("在 goroutine 中运行")
}()

// 带参数的 goroutine
go func(msg string) {
    fmt.Println(msg)
}("Hello")

// 等待 goroutine 完成
var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    fmt.Println("goroutine 完成")
}()
wg.Wait()

Channel

Channel 类型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 无缓冲 channel
ch1 := make(chan int)

// 有缓冲 channel
ch2 := make(chan int, 10)

// 只读 channel
ch3 := make(<-chan int)

// 只写 channel
ch4 := make(chan<- int)

Channel 操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 发送数据
ch <- 42

// 接收数据
value := <-ch

// 关闭 channel
close(ch)

// 遍历 channel
for v := range ch {
    fmt.Println(v)
}

同步原语

WaitGroup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var wg sync.WaitGroup

func worker(id int) {
    defer wg.Done()
    fmt.Printf("Worker %d 开始\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d 完成\n", id)
}

func main() {
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i)
    }
    wg.Wait()
}

Mutex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *SafeCounter) GetCount() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

RWMutex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type SafeMap struct {
    mu    sync.RWMutex
    data  map[string]interface{}
}

func (m *SafeMap) Get(key string) interface{} {
    m.mu.RLock()
    defer m.mu.RUnlock()
    return m.data[key]
}

func (m *SafeMap) Set(key string, value interface{}) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.data[key] = value
}

并发模式

工作池模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("worker %d 开始处理任务 %d\n", id, j)
        time.Sleep(time.Second)
        results <- j * 2
        fmt.Printf("worker %d 完成任务 %d\n", id, j)
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // 启动工作池
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 发送任务
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)

    // 收集结果
    for a := 1; a <= 9; a++ {
        fmt.Println(<-results)
    }
}

发布订阅模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type PubSub struct {
    mu     sync.RWMutex
    subs   map[string][]chan string
}

func NewPubSub() *PubSub {
    return &PubSub{
        subs: make(map[string][]chan string),
    }
}

func (ps *PubSub) Subscribe(topic string) chan string {
    ps.mu.Lock()
    defer ps.mu.Unlock()
    ch := make(chan string)
    ps.subs[topic] = append(ps.subs[topic], ch)
    return ch
}

func (ps *PubSub) Publish(topic string, msg string) {
    ps.mu.RLock()
    defer ps.mu.RUnlock()
    for _, ch := range ps.subs[topic] {
        go func(ch chan string) {
            ch <- msg
        }(ch)
    }
}

并发安全

原子操作

1
2
3
4
5
6
7
8
9
var counter int64

func increment() {
    atomic.AddInt64(&counter, 1)
}

func getCounter() int64 {
    return atomic.LoadInt64(&counter)
}

并发安全的 Map

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var syncMap sync.Map

// 存储键值对
syncMap.Store("key", "value")

// 加载值
value, ok := syncMap.Load("key")

// 删除键值对
syncMap.Delete("key")

// 遍历
syncMap.Range(func(key, value interface{}) bool {
    fmt.Printf("%v: %v\n", key, value)
    return true
})

参考资源

57.12k 字
43篇文章