func(m *Mutex)Lock() { // Fast path: grab unlocked mutex. // 第一次锁,** 状态位为0 **,直接CAS设置锁标志位即可 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) // 非首次,需处理阻塞goroutine和新来goroutine的竞争 m.lockSlow() }
func(m *Mutex)lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. // 正常模式的锁状态下(非饥饿模式,饥饿模式下直接把锁移交给阻塞goroutine),可自旋 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { //sync_runtime_canSpin // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. //当前goroutine自旋,若未标记唤醒,且有阻塞的goroutine,则标记唤醒,防止unlock时唤醒别的阻塞goroutine if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } //自旋等待锁释放 runtime_doSpin() // sync_runtime_doSpin iter++ old = m.state continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. // 非饥饿模式下直接加锁 if old&mutexStarving == 0 { new |= mutexLocked } //记录阻塞队列数 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // 清空新状态值得唤醒标记位 // The goroutine has been woken from sleep, // so we need to reset the flag in either case. ifnew&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { // 加锁前非饥饿模式,非加锁则无需处理 break// locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. // 若当前goroutine已等待过,则加入阻塞队列头部,否则加入尾部 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 判断是否饥饿 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state //饥饿模式下 if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } //加锁并减少一个阻塞goroutine delta := int32(mutexLocked - 1<<mutexWaiterShift) //非饥饿模式 || 只剩一个阻塞goroutine,则退出饥饿模式 if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } // 未加锁成功,则继续唤醒自旋 awoke = true iter = 0 } else { // 状态值已改变则重新获取 old = m.state } }
if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
// Active spinning for sync.Mutex. //go:linkname sync_runtime_canSpin sync.runtime_canSpin //go:nosplit func sync_runtime_canSpin(i int) bool { // sync.Mutex is cooperative, so we are conservative with spinning. // Spin only few times and only if running on a multicore machine and // GOMAXPROCS>1 and there is at least one other running P and local runq is empty. // As opposed to runtime mutex we don't do passive spinning here, // because there can be work on global runq or on other Ps. if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { returnfalse } if p := getg().m.p.ptr(); !runqempty(p) { returnfalse } returntrue }
// Unlock unlocks m. // It is a run-timeerrorif m isnotlockedon entry to Unlock. // // A lockedMutexisnot associated with a particular goroutine. // It is allowed for one goroutine tolock a Mutexandthen // arrange for another goroutine tounlock it. func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) }
// Fastpath: droplock bit. // 首次解锁直接移除标记位即可 new := atomic.AddInt32(&m.state, -mutexLocked) ifnew != 0 { // Outlined slow pathtoallow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } }
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } ifnew&mutexStarving == 0 { //非饥饿模式 old := new for { // If there areno waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed offfrom unlocking // goroutine to the next waiter. We arenot part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So getoff the way. ifold>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the rightto wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter. // Note: mutexLocked isnotset, the waiter will set it after wakeup. // But mutexis still considered lockedif mutexStarving isset, // so new coming goroutines won't acquire it. runtime_Semrelease(&m.sema, true, 1) } }
// runtime/sema.go // A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem). // Each of those sudog may in turn point (through s.waitlink) to a list // of other sudogs waiting on the same address. // The operations on the inner lists of sudogs with the same address // are all O(1). The scanning of the top-level semaRoot list is O(log n), // where n is the number of distinct addresses with goroutines blocked // on them that hash to the given semaRoot. // See golang.org/issue/17953 for a program that worked badly // before we introduced the second level of list, and test/locklinear.go // for a test that exercises this. type semaRoot struct { lock mutex treap *sudog // root of balanced tree of unique waiters. nwait uint32// Number of waiters. Read w/o the lock. }
// Prime to not correlate with any user patterns. const semTabSize = 251
var semtable [semTabSize]struct { root semaRoot pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte }
// runtime/runtime2.go type mutex struct { // Futex-based impl treats it as uint32 key, // while sema-based impl as M* waitm. // Used to be a union, but unions break precise GC. key uintptr } type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops.
g *g
// isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock.
acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }