元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
Go运行时系统
▶
调度器原理
Goroutine调度机制
GMP模型详解
抢占式调度实现
系统线程管理
调度器源码实现分析
▶
网络轮询器
I/O多路复用实现
Epoll事件循环
异步IO处理
▶
系统监控
Sysmon监控线程
死锁检测机制
资源使用监控
▶
内存管理
▶
内存分配器
TCMalloc变体实现
mcache与mspan
对象分配流程
堆内存管理
▶
栈管理
分段栈实现
连续栈优化
栈扩容机制
▶
并发模型
▶
Channel实现
Channel底层结构
发送与接收流程
select实现原理
同步原语实现
▶
原子操作
CPU指令支持
内存顺序保证
sync/atomic实现
▶
并发原语
sync.Map实现原理
WaitGroup实现机制
Mutex锁实现
RWMutex读写锁
Once单次执行
Cond条件变量
信号量代码详解
信号量实现源码分析
信号量应用示例
▶
垃圾回收机制
▶
GC核心算法
三色标记法
三色标记法示例解析
写屏障技术
混合写屏障实现
▶
GC优化策略
GC触发条件
并发标记优化
内存压缩策略
▶
编译与链接
▶
编译器原理
AST构建过程
SSA生成优化
逃逸分析机制
▶
链接器实现
符号解析处理
重定位实现
ELF文件生成
▶
类型系统
▶
基础类型
类型系统概述
基本类型实现
复合类型结构
▶
切片与Map
切片实现原理
切片扩容机制
Map哈希实现
Map扩容机制详解
Map冲突解决
Map并发安全
▶
反射与接口
▶
类型系统
rtype底层结构
接口内存布局
方法表构建
▶
反射机制
ValueOf实现
反射调用代价
类型断言优化
▶
标准库实现
▶
同步原语
sync.Mutex实现
RWMutex原理
WaitGroup机制
▶
Context实现
上下文传播链
取消信号传递
Value存储优化
▶
time定时器实现
Timer实现原理
Ticker周期触发机制
时间轮算法详解
定时器性能优化
定时器源码分析
▶
执行流程
▶
错误异常
错误处理机制
panic与recover
错误传播最佳实践
错误包装与检查
自定义错误类型
▶
延迟执行
defer源码实现分析
▶
性能优化
▶
执行效率优化
栈内存优化
函数内联策略
边界检查消除
字符串优化
切片预分配
▶
内存优化
对象池实现
内存对齐优化
GC参数调优
内存泄漏分析
堆栈分配优化
▶
并发性能优化
Goroutine池化
并发模式优化
锁竞争优化
原子操作应用
Channel效率优化
▶
网络性能优化
网络轮询优化
连接池管理
网络缓冲优化
超时处理优化
网络协议调优
▶
编译优化
编译器优化选项
代码生成优化
链接优化技术
交叉编译优化
构建缓存优化
▶
性能分析工具
性能基准测试
CPU分析技术
内存分析方法
追踪工具应用
性能监控系统
▶
调试与工具
▶
dlv调试
dlv调试器使用
dlv命令详解
dlv远程调试
▶
调试支持
GDB扩展实现
核心转储分析
调试器接口
▶
分析工具
pprof实现原理
trace工具原理
竞态检测实现
▶
跨平台与兼容性
▶
系统抽象层
syscall封装
OS适配层
字节序处理
▶
cgo机制
CGO调用开销
指针传递机制
内存管理边界
▶
工程管理
▶
包管理
Go模块基础
模块初始化配置
依赖版本管理
go.mod文件详解
私有模块配置
代理服务设置
工作区管理
模块版本选择
依赖替换与撤回
模块缓存管理
第三方包版本形成机制
发布时间:
2025-04-25 11:32
↑
☰
# Go语言调度器源码实现分析 ## 概述 Go语言的调度器是其并发模型的核心组件,负责管理和调度goroutine的执行。本文将深入分析Go调度器的源码实现,揭示其内部工作机制和关键数据结构,帮助读者理解Go运行时系统如何高效地管理大量并发任务。 ## 调度器源码结构 ### 核心源码文件 Go调度器的源码主要分布在以下文件中: ``` runtime/ ├── proc.go // 调度器核心实现 ├── runtime2.go // 核心数据结构定义 ├── sched.go // 调度器初始化和管理 ├── stack.go // 栈管理 ├── mstats.go // 统计信息 └── mgc.go // GC相关调度 ``` 其中,`proc.go`是调度器实现的核心,包含了调度循环、goroutine创建和调度等关键逻辑。 ## 核心数据结构 ### G结构体(Goroutine) `G`结构体在`runtime2.go`中定义,代表一个goroutine: ```go type g struct { stack stack // 栈内存范围:[stack.lo, stack.hi) stackguard0 uintptr // 栈溢出检测 m *m // 当前关联的M sched gobuf // 调度信息,保存上下文 atomicstatus uint32 // goroutine状态 goid int64 // goroutine ID schedlink guintptr // 下一个G,形成链表 preempt bool // 抢占标志 lockedm muintptr // 锁定的M paniconfault bool // 是否因panic而终止 // ... 更多字段 } ``` `gobuf`保存了goroutine的执行上下文,用于切换时保存和恢复状态: ```go type gobuf struct { sp uintptr // 栈指针 pc uintptr // 程序计数器 g guintptr // 指向goroutine ctxt unsafe.Pointer // 上下文 ret uintptr // 系统调用返回值 // ... 其他字段 } ``` ### M结构体(Machine) `M`结构体代表一个系统线程: ```go type m struct { g0 *g // 调度栈的goroutine curg *g // 当前运行的goroutine p puintptr // 关联的P nextg *g // 预存的下一个要运行的G spinning bool // M是否在自旋寻找工作 incgo bool // 是否在执行cgo调用 // ... 更多字段 } ``` ### P结构体(Processor) `P`结构体代表处理器资源: ```go type p struct { id int32 status uint32 // P状态:idle/running/syscall/waiting link puintptr // 链表中的下一个P m muintptr // 关联的M mcache *mcache // 内存分配缓存 runqhead uint32 // 本地运行队列头 runqtail uint32 // 本地运行队列尾 runq [256]guintptr // 本地运行队列 runnext guintptr // 下一个优先执行的G // ... 更多字段 } ``` ### 调度器结构体 全局调度器状态保存在`runtime.sched`变量中: ```go type schedt struct { lock mutex // 全局锁 midle muintptr // 空闲M链表 nmidle int32 // 空闲M数量 maxmcount int32 // 最大M数量 pidle puintptr // 空闲P链表 npidle uint32 // 空闲P数量 runq gQueue // 全局运行队列 runqsize int32 // 全局运行队列大小 // ... 更多字段 } ``` ## 调度器初始化 调度器初始化在`runtime.schedinit`函数中完成: ```go func schedinit() { // ... 其他初始化 // 获取处理器核心数 procs := ncpu if n := atoi(gogetenv("GOMAXPROCS")); n > 0 { procs = n } // 调整P的数量 if procresize(procs) != nil { throw("procresize failed") } // ... 其他初始化 } ``` `procresize`函数负责创建和初始化P: ```go func procresize(nprocs int32) *p { // 创建新的P或回收多余的P for i := int32(0); i < nprocs; i++ { pp := allp[i] if pp == nil { pp = new(p) pp.id = i pp.status = _Pgcstop pp.sudogcache = pp.sudogbuf[:0] for i := range pp.deferpool { pp.deferpool[i] = pp.deferpoolbuf[i][:0] } allp[i] = pp } // ... 初始化P } // ... 处理多余的P return allp[0] // 返回第一个P } ``` ## 调度循环实现 调度循环是Go调度器的核心,在`runtime.schedule`函数中实现: ```go func schedule() { _g_ := getg() if _g_.m.locks != 0 { throw("schedule: holding locks") } if _g_.m.lockedg != 0 { // 运行锁定的G lockedg := _g_.m.lockedg.ptr() execute(lockedg, false) return } // 寻找可运行的G gp, inheritTime := findrunnable() // 执行找到的G execute(gp, inheritTime) } ``` `findrunnable`函数负责寻找下一个可运行的goroutine: ```go func findrunnable() (*g, bool) { _g_ := getg() // 1. 尝试从本地运行队列获取 if gp := runqget(_g_.m.p.ptr()); gp != nil { return gp, false } // 2. 尝试从全局运行队列获取 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_g_.m.p.ptr(), 0) unlock(&sched.lock) if gp != nil { return gp, false } } // 3. 从其他P窃取 if gp := runqsteal(_g_.m.p.ptr(), randp); gp != nil { return gp, false } // 4. 没有找到可运行的G,准备休眠 // ... } ``` `execute`函数负责执行goroutine: ```go func execute(gp *g, inheritTime bool) { _g_ := getg() // 将G与当前M关联 _g_.m.curg = gp gp.m = _g_.m // 切换到goroutine的栈 casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard // 执行goroutine gogo(&gp.sched) } ``` `gogo`是一个汇编函数,负责保存当前上下文并切换到目标goroutine的上下文。 ## Goroutine创建 Goroutine的创建在`runtime.newproc`函数中实现: ```go func newproc(siz int32, fn *funcval) { argp := add(unsafe.Pointer(&fn), sys.PtrSize) gp := getg() pc := getcallerpc() // 创建新的goroutine newg := newproc1(fn, argp, siz, gp, pc) // 将新goroutine放入运行队列 runqput(gp.m.p.ptr(), newg, true) // 如果有空闲的P和M,唤醒它们 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { wakep() } } ``` `newproc1`函数负责实际创建goroutine: ```go func newproc1(fn *funcval, argp unsafe.Pointer, siz int32, callergp *g, callerpc uintptr) *g { _g_ := getg() // 从P的本地缓存获取或创建一个新的G _p_ := _g_.m.p.ptr() newg := gfget(_p_) if newg == nil { newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) allgadd(newg) } // 初始化G的栈和参数 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize totalSize = round(totalSize, sys.StackAlign) sp := newg.stack.hi - totalSize spArg := sp // 处理参数 if siz > 0 { memmove(unsafe.Pointer(spArg), argp, uintptr(siz)) } // 设置G的调度信息 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.sched.pc = funcPC(goexit) + sys.PCQuantum newg.sched.g = guintptr(unsafe.Pointer(newg)) newg.startpc = fn.fn // 设置G状态为可运行 casgstatus(newg, _Gdead, _Grunnable) return newg } ``` ## 调度器工作窃取 Go调度器采用工作窃取算法平衡各个P的负载。当一个P的本地运行队列为空时,它会尝试从其他P窃取goroutine: ```go func runqsteal(_p_, stealp *p, stealRunq bool) *g { if stealRunq { // 从stealp的本地运行队列窃取一半的G if gp := runqgrab(stealp, 1); gp != nil { return gp } } // 随机选择一个P进行窃取 for i := 0; i < int(gomaxprocs); i++ { if sched.gcwaiting != 0 { return nil } stealp := allp[fastrand() % uint32(gomaxprocs)] if stealp == _p_ { continue } // 尝试窃取 if gp := runqgrab(stealp, 1); gp != nil { return gp } } return nil } ``` `runqgrab`函数负责从目标P的本地运行队列窃取goroutine: ```go func runqgrab(_p_ *p, batch int32) *g { for { h := atomic.LoadAcq(&_p_.runqhead) t := atomic.LoadAcq(&_p_.runqtail) n := t - h if n == 0 { return nil } if n > batch { n = batch } if atomic.CasRel(&_p_.runqhead, h, h+n) { gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() return gp } } } ``` ## M的创建与管理 ### M的创建来源 M代表操作系统线程,在Go运行时系统中,M的创建主要有以下几个来源: 1. **程序初始化时**:当Go程序启动时,运行时会创建初始的M(主线程),用于执行初始化工作和main goroutine。这在`runtime.schedinit`函数中完成: ```go func schedinit() { // ... 其他初始化 // 创建主线程M m0 := &m0 m0.g0 = &g0 m0.curg = &g0 // ... 其他初始化 } ``` 2. **系统调用返回时**:当G执行系统调用时,如果系统调用会阻塞,运行时可能会创建新的M来保持P的利用率: ```go func entersyscall() { // ... 保存状态 // 如果有其他可运行的G,将P交给其他M或创建新的M if sched.gcwaiting == 0 { handoffp(pp) } } func handoffp(pp *p) { // 尝试唤醒或创建M来运行P if sched.pidle != 0 && atomic.Load(&sched.npidle) != 0 { startm(pp, false) } } ``` 3. **创建过多的goroutine时**:当本地和全局运行队列中有大量等待运行的goroutine,而空闲的P没有对应的M时,会创建新的M: ```go func wakep() { // 如果有空闲的P但没有自旋的M,创建或唤醒一个M if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { startm(nil, false) } } ``` 4. **CGO调用**:当程序执行CGO调用时,可能会创建新的M来处理Go代码,因为CGO可能会阻塞当前线程: ```go func needm(x int32) { // 在CGO调用中需要一个新的M newm(nil, nil) } ``` ### M的创建过程 M的创建过程在`runtime.newm`函数中实现: ```go func newm(fn func(), _p_ *p) { mp := allocm(_p_, fn) mp.nextp.set(_p_) mp.sigmask = initSigmask // 创建一个新的操作系统线程 newosproc(mp) } ``` `allocm`函数负责分配和初始化M结构体: ```go func allocm(_p_ *p, fn func()) *m { // 分配新的m结构体 mp := new(m) // 初始化m的字段 mp.g0 = malg(-1) // 创建g0,用于调度栈 mp.g0.m = mp // 设置调度函数 if fn == nil { fn = mstart } mp.mstartfn = fn // 初始化锁 mcommoninit(mp) // 添加到全局m列表 lock(&sched.lock) if sched.mnext != 0 { mp.id = sched.mnext sched.mnext++ } else { mp.id = sched.maxmcount sched.maxmcount++ } unlock(&sched.lock) return mp } ``` `newosproc`函数负责创建实际的操作系统线程,这是一个平台相关的函数,在Linux上的实现如下: ```go func newosproc(mp *m) { stk := unsafe.Pointer(mp.g0.stack.hi) // 准备线程参数 var attr pthread_attr_t pthread_attr_init(&attr) pthread_attr_setdetachstate(&attr, _PTHREAD_CREATE_DETACHED) pthread_attr_setstack(&attr, stk, 0x10000) // 设置栈大小 // 创建线程 var oset sigset_t sigprocmask(_SIG_SETMASK, &sigset_all, &oset) ret := pthread_create(&attr, funcPC(mstart_stub), unsafe.Pointer(mp)) sigprocmask(_SIG_SETMASK, &oset, nil) if ret != 0 { throw("pthread_create failed") } } ``` `mstart_stub`和`mstart`函数是新线程的入口点: ```go func mstart_stub() { _g_ := getg() // 初始化TLS _g_.m.procid = uint64(pthread_self()) // 调用实际的启动函数 mstart() } func mstart() { _g_ := getg() // 执行用户设置的启动函数或默认进入调度循环 if _g_.m.mstartfn != nil { _g_.m.mstartfn() } // 进入调度循环 mstart1() } ``` ### M与P的绑定 M与P的绑定在`runtime.acquirep`函数中实现: ```go func acquirep(_p_ *p) { // 将P与当前M绑定 _g_ := getg() // 设置P的状态为运行中 atomic.Store(&_p_.status, _Prunning) // 建立M和P的双向关联 _p_.m.set(_g_.m) _g_.m.p.set(_p_) // 更新P的本地缓存 _p_.mcache = _g_.m.mcache } ``` 当M需要执行调度时,会先尝试获取一个P: ```go func startm(_p_ *p, spinning bool) { // 获取一个空闲的M var mp *m lock(&sched.lock) if sched.midle != 0 { // 从空闲M列表获取 mp = sched.midle.ptr() sched.midle = mp.schedlink sched.nmidle-- } else { // 创建新的M unlock(&sched.lock) newm(nil, _p_) return } unlock(&sched.lock) // 设置M的状态 mp.spinning = spinning mp.nextp.set(_p_) // 唤醒M notewakeup(&mp.park) } ``` ### M的休眠与唤醒 当M没有工作可做时,会进入休眠状态: ```go func stopm() { _g_ := getg() if _g_.m.locks != 0 { throw("stopm holding locks") } // 将M放入空闲列表 lock(&sched.lock) _g_.m.nextp = 0 _g_.m.schedlink = sched.midle sched.midle.set(_g_.m) sched.nmidle++ unlock(&sched.lock) // 休眠等待唤醒 notesleep(&_g_.m.park) // 被唤醒后重置状态 noteclear(&_g_.m.park) } ``` ## 系统调用处理 当goroutine进行系统调用时,Go运行时会将其与M解绑,并将P转交给其他M使用,以保持系统的并发度: ```go func entersyscall() { // 保存当前G的状态 _g_ := getg() _g_.m.syscalltick = _g_.m.p.ptr().syscalltick _g_.sysblocktraced = true // 将P与M解绑 pp := _g_.m.p.ptr() pp.m = 0 _g_.m.oldp.set(pp) _g_.m.p = 0 atomic.Store(&pp.status, _Psyscall) // 如果有其他可运行的G,将P交给其他M if sched.gcwaiting == 0 { handoffp(pp) } } func exitsyscall() { _g_ := getg() // 尝试重新获取P oldp := _g_.m.oldp.ptr() _g_.m.oldp = 0 if exitsyscallfast() { // 成功获取P,继续执行 return } // 无法快速获取P,进入慢路径 exitsyscall0(_g_) } ``` ### 系统调用期间的M处理 在系统调用期间,M会保持阻塞状态,但不会释放。当系统调用返回时,M会尝试重新获取P: ```go func exitsyscallfast() bool { _g_ := getg() // 尝试获取原来的P oldp := _g_.m.oldp.ptr() if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) { // 成功获取原P acquirep(oldp) return true } // 尝试获取任意空闲P for i := 0; i < int(gomaxprocs); i++ { p := allp[i] if p.status == _Pidle && atomic.Cas(&p.status, _Pidle, _Pidle) { acquirep(p) return true } } return false } ``` 如果无法快速获取P,M会将当前G放入全局队列,然后进入休眠状态: ```go func exitsyscall0(gp *g) { _g_ := getg() // 将G放入全局队列 lock(&sched.lock) globrunqput(gp) unlock(&sched.lock) // 尝试唤醒或创建一个M来执行G if atomic.Load(&sched.npidle) != 0 { wakep() } // 当前M进入休眠 stopm() // 被唤醒后重新调度 schedule() } ``` ## 抢占式调度实现 Go 1.14引入了基于信号的抢占式调度,主要在`runtime.preemptone`函数中实现: ```go func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp.spinning { return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false } gp.preempt = true // 设置抢占标志 gp.stackguard0 = stackPreempt // 对于阻塞在系统调用的G,发送信号 if mp.syscalltick != 0 && mp.syscalltick != _p_.syscalltick { preemptM(mp) } return true } ``` 在Go 1.14之后,通过向线程发送信号实现抢占: ```go func preemptM(mp *m) { if atomic.Cas(&mp.signalPending, 0, 1) { // 发送SIGURG信号 signal_proc(mp.procid) } } ``` ## 调度器性能优化 ### 运行队列优化 Go调度器使用了多级运行队列来优化性能: 1. **本地运行队列**:每个P维护一个本地运行队列,大小为256 2. **runnext字段**:P中的runnext字段用于快速调度刚创建的goroutine 3. **全局运行队列**:当本地队列满时,会将一半的goroutine转移到全局队列 ### 自旋优化 M在没有找到可运行的G时不会立即休眠,而是会先自旋一段时间: ```go func findrunnable() (*g, bool) { // ... 前面的查找逻辑 // 准备休眠前先自旋 if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // 自旋查找工作 for i := 0; i < 4; i++ { // 再次检查全局队列和其他P // ... } // 停止自旋 if _g_.m.spinning { _g_.m.spinning = false atomic.Xadd(&sched.nmspinning, -1) } // 准备休眠 // ... } ``` ## 调度器与GC的交互 Go调度器与垃圾回收器紧密协作,在GC期间会调整调度策略: ```go func gcStart(trigger gcTrigger) { // ... 其他GC初始化 // 停止所有P stopTheWorldWithSema() // 启动GC工作 // ... // 重新启动世界 startTheWorldWithSema() } ``` `stopTheWorldWithSema`函数会暂停所有P的执行,等待GC完成: ```go func stopTheWorldWithSema() { _g_ := getg() // 设置GC等待标志 sched.gcwaiting.Store(1) // 抢占所有P preemptall() // 等待所有P停止 for i := 0; i < int(gomaxprocs); i++ { p := allp[i] for p.status != _Pgcstop { // 等待P进入GC停止状态 } } } ``` ## 总结 Go调度器的源码实现体现了其设计理念:高效、可扩展且对开发者透明。通过GMP模型、工作窃取算法、多级运行队列等技术,Go调度器能够高效地管理大量goroutine,充分利用多核处理器资源。 理解Go调度器的源码实现,不仅有助于深入理解Go的并发模型,还能帮助开发者编写更高效的并发程序,避免常见的并发陷阱。随着Go语言的发展,调度器也在不断优化,如引入抢占式调度、非均匀内存访问(NUMA)感知等特性,进一步提升了Go程序的性能和可靠性。 通过本文的分析,我们可以看到Go调度器是一个精心设计的系统,它在简洁性和高性能之间取得了良好的平衡,为Go语言的并发编程模型提供了坚实的基础。