当前位置: 首页 > news >正文

Go语言并发模式深度解析

Go语言并发模式深度解析

Go语言以其独特的并发模型而闻名,goroutine和channel是Go并发编程的核心。本文将深入探讨Go语言中的并发模式和最佳实践。

一、Goroutine原理

1.1 Goroutine与线程的区别

func main() { // 创建10万个goroutine for i := 0; i < 100000; i++ { go func(id int) { fmt.Printf("Goroutine %d\n", id) }(i) } }
特性GoroutineOS线程
栈大小初始2KB,可增长通常1MB
创建开销~2KB栈 + 少量结构内存分配 + 内核调度
上下文切换用户态,快速内核态,较慢
数量限制可创建数万通常数百

1.2 Goroutine调度器

// GPM调度模型 type g struct { stack stack stackguard0 uintptr stackguard1 uintptr _panic *_panic _defer *_defer m *m sched gobuf } type m struct { g0 *g curg *g p puintptr nextp puintptr alllink *m schedtick int32 } type p struct { mu mutex id int32 status uint32 m *m runqhead uint32 runqtail uint32 runq [256]guintptr gFree *g }

1.3 调度策略

// work-stealing算法 func schedule() { for { gp := runqget(_g_.m.p.ptr()) if gp == nil { gp = findrunnable() } execute(gp, false) } } func runqget(p *p) *g { for { head := atomic.Load(&p.runqhead) tail := p.runqtail if head == tail { return nil } gp := p.runq[head%uint32(len(p.runq))].ptr() if atomic.Cas(&p.runqhead, head, head+1) { return gp } } }

二、Channel深度解析

2.1 Channel类型

type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters lock mutex } type waitq struct { first *sudog last *sudog }

2.2 Channel操作

// 发送操作 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = cputicks() } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) return true }

2.3 Channel模式

// 模式1:生产者-消费者 func producer(ch chan<- int) { for i := 0; i < 10; i++ { ch <- i } close(ch) } func consumer(ch <-chan int) { for val := range ch { fmt.Println(val) } } // 模式2:扇出 func fanOut(input <-chan int, n int) []<-chan int { channels := make([]<-chan int, n) for i := 0; i < n; i++ { channels[i] = process(input) } return channels } func process(input <-chan int) <-chan int { out := make(chan int) go func() { for val := range input { out <- val * 2 } close(out) }() return out } // 模式3:扇入 func fanIn(channels ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup wg.Add(len(channels)) for _, ch := range channels { go func(c <-chan int) { defer wg.Done() for val := range c { out <- val } }(ch) } go func() { wg.Wait() close(out) }() return out }

三、同步原语

3.1 Mutex与RWMutex

type SafeCounter struct { mu sync.Mutex value int } func (c *SafeCounter) Inc() { c.mu.Lock() defer c.mu.Unlock() c.value++ } func (c *SafeCounter) Get() int { c.mu.Lock() defer c.mu.Unlock() return c.value } // RWMutex适合读多写少场景 type SharedData struct { mu sync.RWMutex data map[string]string } func (d *SharedData) Read(key string) (string, bool) { d.mu.RLock() defer d.mu.RUnlock() val, ok := d.data[key] return val, ok } func (d *SharedData) Write(key, value string) { d.mu.Lock() defer d.mu.Unlock() d.data[key] = value }

3.2 WaitGroup

func processItems(items []Item) error { var wg sync.WaitGroup errs := make(chan error, len(items)) for _, item := range items { wg.Add(1) go func(i Item) { defer wg.Done() if err := process(i); err != nil { errs <- err } }(item) } go func() { wg.Wait() close(errs) }() for err := range errs { return err } return nil }

3.3 Once

type Config struct { once sync.Once loaded bool data map[string]string } func (c *Config) Load() { c.once.Do(func() { c.data = loadConfigFromFile() c.loaded = true }) }

3.4 Cond

type Queue struct { mu sync.Mutex cond *sync.Cond items []interface{} } func NewQueue() *Queue { q := &Queue{ items: make([]interface{}, 0), } q.cond = sync.NewCond(&q.mu) return q } func (q *Queue) Enqueue(item interface{}) { q.mu.Lock() defer q.mu.Unlock() q.items = append(q.items, item) q.cond.Signal() } func (q *Queue) Dequeue() interface{} { q.mu.Lock() defer q.mu.Unlock() for len(q.items) == 0 { q.cond.Wait() } item := q.items[0] q.items = q.items[1:] return item }

四、并发设计模式

4.1 Worker Pool模式

type WorkerPool struct { workers int tasks chan Task wg sync.WaitGroup } func NewWorkerPool(workers int) *WorkerPool { return &WorkerPool{ workers: workers, tasks: make(chan Task, 100), } } func (p *WorkerPool) Start() { for i := 0; i < p.workers; i++ { p.wg.Add(1) go p.worker() } } func (p *WorkerPool) worker() { defer p.wg.Done() for task := range p.tasks { task.Execute() } } func (p *WorkerPool) Submit(task Task) { p.tasks <- task } func (p *WorkerPool) Stop() { close(p.tasks) p.wg.Wait() }

4.2 Context模式

func fetchData(ctx context.Context, url string) (string, error) { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return "", err } resp, err := http.DefaultClient.Do(req) if err != nil { return "", err } defer resp.Body.Close() return io.ReadAll(resp.Body) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() data, err := fetchData(ctx, "http://example.com") if err != nil { log.Fatal(err) } fmt.Println(data) }

4.3 Pipeline模式

func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } func filterEven(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { if n%2 == 0 { out <- n } } close(out) }() return out } func main() { c := gen(1, 2, 3, 4, 5) s := sq(c) f := filterEven(s) for n := range f { fmt.Println(n) } }

五、并发安全数据结构

5.1 并发安全Map

type ConcurrentMap struct { mu sync.RWMutex items map[string]interface{} } func NewConcurrentMap() *ConcurrentMap { return &ConcurrentMap{ items: make(map[string]interface{}), } } func (m *ConcurrentMap) Get(key string) (interface{}, bool) { m.mu.RLock() defer m.mu.RUnlock() val, ok := m.items[key] return val, ok } func (m *ConcurrentMap) Set(key string, value interface{}) { m.mu.Lock() defer m.mu.Unlock() m.items[key] = value } func (m *ConcurrentMap) Delete(key string) { m.mu.Lock() defer m.mu.Unlock() delete(m.items, key) }

5.2 并发安全队列

type ConcurrentQueue struct { mu sync.Mutex items []interface{} } func NewConcurrentQueue() *ConcurrentQueue { return &ConcurrentQueue{ items: make([]interface{}, 0), } } func (q *ConcurrentQueue) Enqueue(item interface{}) { q.mu.Lock() defer q.mu.Unlock() q.items = append(q.items, item) } func (q *ConcurrentQueue) Dequeue() (interface{}, bool) { q.mu.Lock() defer q.mu.Unlock() if len(q.items) == 0 { return nil, false } item := q.items[0] q.items = q.items[1:] return item, true } func (q *ConcurrentQueue) Len() int { q.mu.RLock() defer q.mu.RUnlock() return len(q.items) }

六、并发最佳实践

6.1 避免共享状态

// 不好的做法:共享状态 var counter int var mu sync.Mutex func increment() { mu.Lock() counter++ mu.Unlock() } // 好的做法:通过channel传递状态 func counterWithChannel(start int) <-chan int { out := make(chan int) go func() { for i := start; ; i++ { out <- i } }() return out }

6.2 优雅关闭

func worker(ctx context.Context, jobs <-chan Job) { for { select { case job, ok := <-jobs: if !ok { return } process(job) case <-ctx.Done(): return } } }

6.3 错误处理

type Result struct { Value interface{} Err error } func processWithErrorHandling(ctx context.Context, jobs <-chan Job) <-chan Result { results := make(chan Result) go func() { defer close(results) for job := range jobs { select { case <-ctx.Done(): return default: val, err := process(job) results <- Result{Value: val, Err: err} } } }() return results }

七、总结

Go语言的并发模型提供了强大而简洁的并发编程能力:

  1. Goroutine:轻量级执行单元,高效的M:N调度
  2. Channel:类型安全的通信机制,实现无锁并发
  3. 同步原语:Mutex、RWMutex、WaitGroup、Once、Cond
  4. 设计模式:Worker Pool、Pipeline、Fan-Out/Fan-In
  5. 最佳实践:避免共享状态、优雅关闭、错误处理

掌握这些知识,可以编写出高效、安全的并发Go程序。

http://www.jsqmd.com/news/927290/

相关文章:

  • 2026年知名的实力派窗帘品牌/原创窗帘品牌可靠供应商推荐 - 品牌宣传支持者
  • 2026年云南昆明三角梅培育基地/昆明基地/昆明绣球基地/昆明亚麻基地采购必看榜 - 行业平台推荐
  • 神经网络与深度学习第四周学习笔记(3/4)
  • 别再折腾环境了!手把手教你用Vivado 2018.3和Modelsim 22.04搞定联合仿真(附库编译避坑指南)
  • 保姆级教程:在Deepin V23上配置xrdp+x11vnc,实现Windows远程桌面稳定连接
  • 2026年5月企业AI操作系统推荐:TOP5评测市场份额专业选择指南办公协同案例
  • 别再手动写多选了!手把手教你封装一个uView Picker多选组件(附完整源码)
  • 基于Python+Django的私有化云笔记系统:从痛点分析到完整实现
  • 2026年口碑好的肥东县窗帘/庐阳区窗帘/肥西县窗帘厂家精选合集 - 行业平台推荐
  • 跨境电商独立站2026最新从0-1完整搭建流程
  • AI时代新型攻击:从对抗样本到数据投毒的防御体系重构
  • 从0到1吃透Pandas!Python数据分析零基础实战教程
  • 8张RTX 4090实测:MedicalGPT项目全流程训练中的显存分配与参数调优实战记录
  • 基于助睿平台的浏览器市场与用户画像分析-数据加工
  • 2026年口碑好的基地/绣球基地/亚麻基地/三角梅养殖基地精选推荐榜 - 品牌宣传支持者
  • 2026年热门的岩棉净化板/甘肃净化板厂家精选合集 - 品牌宣传支持者
  • 保姆级教程:用Python脚本将OPIXray/HIXray安检X光数据集转成YOLO格式(附完整代码)
  • 从‘刻舟求剑’到‘乒乓切换’:图解STM32H7中DMA双缓存与Cache的协同工作
  • 2026年评价高的庐阳区窗帘/合肥窗帘/包河区窗帘/新站区窗帘长期合作厂家推荐 - 品牌宣传支持者
  • 广度优先搜索 (BFS)
  • 第 5 周——诗词创作模块后端接口对接
  • 2026年比较好的梁山水处理乳品设备/梁山乳品设备/离心机乳品设备/均质机乳品设备精选推荐公司 - 行业平台推荐
  • AI时代密码安全新策略:从随机密码到密码管理器的全面防御
  • 2026年质量好的共挤膜气泡膜卷/彩色气泡膜卷可靠供应商推荐 - 行业平台推荐
  • 在WSL2的Ubuntu 22.04上,用Intel OneAPI 2024编译VASP 6.3.2的保姆级教程
  • 别再只用Aircrack了!横向评测Kismet与airodump-ng:无线网络扫描工具到底怎么选?
  • 2026年知名的水表箱/SMC水表箱/防冻水表箱优质厂家汇总推荐 - 行业平台推荐
  • 用STM32F103和继电器DIY智能家居:低成本改造台灯与风扇的保姆级教程
  • 从开源哲学到AI伦理:模块化、透明性与协作如何重塑技术未来
  • 2026年义乌本地快递气泡袋/气泡袋/气泡袋定制长期合作厂家推荐 - 行业平台推荐