go语言使用互斥锁进行同步
我们可以利用互斥锁来保护代码中的关键部分,从而确保每次只能有一个goroutine访问共享资源。这样一来,就能避免竞争条件的问题。几乎所有支持并发编程的语言中,都使用了类似互斥锁的机制。在本章中,我们首先会了解互斥锁的功能。之后,还会探讨
一种名为“读写互斥锁”的特殊类型的互斥锁。
读写互斥锁能在我们只需要在修改共享资源时才阻止并发操作的情况下,帮助我们提升性能。利用这种机制,我们可以允许多个进程同时读取共享资源,而与此同时,又能确保只有某个进程能够进行写入操作。我们将通过一个示例来了解读写互斥锁的用法,还会学习其内部原理,并亲手实现一个类似的互斥锁。
使用互斥量保护临界区
如果我们能确保每次只有一条执行线程能够访问那些关键代码段,那该多好啊。这就是互斥锁的作用所在。可以把互斥锁看作是一种“物理锁”,它能防止多个协程同时访问代码中的特定部分。只要每次只有一个协程在访问关键代码段,就能避免竞争条件的问题。因为,竞争条件只有在那两个或多个协程同时尝试访问同一资源时才会发生。
我们可以利用互斥锁来标记代码中临界区的起始和结束位置。当某个协程进入由互斥锁保护的临界区时,它首先会通过程序代码中的指令明确地锁定该互斥锁。之后,协程开始执行临界区内的代码。当执行完成后,它会释放互斥锁,这样其他协程就可以进入该临界区了。
如果另一个协程试图锁定一个已经被占用的互斥锁,那么该协程将被挂起,直到互斥锁被释放为止。如果有多个协程都在等待锁的释放,那么只会有一个协程能够继续执行,而那个协程就是下一个成功获得互斥锁的协程。
所谓“互斥锁”,其实就是一种用于防止竞争条件的并发控制机制。它确保同一时间只能有一个执行单元(比如一个协程或内核级线程)进入临界区域。如果有两个执行单元同时试图获取对该互斥锁的访问权,那么互斥锁的规则会保证只有一个协程能够成功获取访问权,而另一个执行单元则必须等待,直到互斥锁再次变为可用状态。
在 Go 语言中,互斥锁的功能由sync包中的Mutex类提供。通过Mutex类,我们可以使用Lock()和Unlock()这两个方法来实现对资源的锁定与解锁操作。
packagemainimport("fmt""sync""time")funcstingy(money*int,mutex*sync.Mutex){fori:=0;i<1000000;i++{// 进入临界区之前,先锁定互斥变量mutex.Lock()*money+=10// 离开临界区之后,解锁互斥变量mutex.Unlock()}fmt.Println("Stingy Done")}funcspendy(money*int,mutex*sync.Mutex){fori:=0;i<1000000;i++{mutex.Lock()*money-=10mutex.Unlock()}fmt.Println("Spendy Done")}funcmain(){money:=100mutex:=sync.Mutex{}gostingy(&money,&mutex)gospendy(&money,&mutex)time.Sleep(2*time.Second)mutex.Lock()fmt.Println("Money in bank account: ",money)mutex.Unlock()}在我们的主函数中,当各个协程执行完成后,我们在读取“money”变量时也会使用互斥锁来确保同步。虽然由于我们设置了等待时间来确保所有协程都已完成执行,因此出现竞争条件的概率很低。但即便如此,保护共享资源始终是良好的编程习惯。使用互斥锁(以及后面章节中介绍的其他同步机制),可以确保协程能够读取到该变量的最新值。
请注意:我们必须保护所有关键代码段,包括那些goroutine仅用于读取共享资源的区域。编译器的优化措施可能会重新调整指令的执行顺序,从而导致指令以不同的方式被执行。通过使用适当的同步机制,比如互斥锁,我们可以确保自己能够读取到共享资源的最新版本。
互斥锁是如何实现的呢?
互斥锁的实现通常需要操作系统和硬件的支持。如果系统只有一个处理器,那么我们可以通过在某个线程持有锁时禁止中断的方式来实现互斥锁。这样一来,其他线程就不会干扰当前线程的执行,从而避免冲突。不过,这种做法并不理想,因为编写不当的代码可能会导致整个系统被阻塞,让其他进程和线程都无法正常运行。此外,如果系统有多个处理器,这种做法也是行不通的,因为其他线程可以在其他CPU上并行执行。
互斥量的实现需要硬件的支持,以便能够进行原子的测试和设置操作。通过这种操作,某个执行单元可以检查内存中的值是否与预期相符;如果相符,它就会将内存中的值设置为“已锁定”状态。硬件确保了这一测试和设置操作的原子性——也就是说,在操作完成之前,其他执行单元无法访问该内存位置。早期的硬件实现方式是通过封锁整个总线来确保这种原子性,这样其他处理器就无法同时使用该内存。如果另一个执行单元尝试进行同样的操作,却发现内存已经被设置为“已锁定”状态,那么操作系统会阻止该线程的继续执行,直到内存状态再次变为“可用”为止。
互斥锁与顺序处理
当然,当协程的数量超过两个时,我们也可以使用互斥锁来解决同步问题。
请注意:使用互斥锁会限制程序的并发程度。在互斥锁被锁定和解锁之间的代码段,任何时候都只能由一个协程来执行,这样一来,这段代码就变成了顺序执行的代码。正如我们在第一章中所看到的,根据阿姆达尔定律,顺序执行与并行执行的比例会限制代码的性能可扩展性。因此,我们必须尽量减少互斥锁被占用的时间。
如果在countLetters()函数开始时锁定互斥锁,而在函数执行完毕后释放它,会怎么样呢?从下面的代码可以看出:我们在调用该函数后立即锁定互斥锁,而在输出完成后的消息后释放互斥锁。
packagemainimport("fmt""io""net/http""strings""sync""time")constAllLetters="abcdefghijklmnopqrstuvwxyz"funcmain(){mutex:=sync.Mutex{}varfrequency=make([]int,26)fori:=1000;i<=1030;i++{url:=fmt.Sprintf("https://rfc-editor.org/rfc/rfc%d.txt",i)goCountLetters(url,frequency,&mutex)}time.Sleep(60*time.Second)mutex.Lock()fori,c:=rangeAllLetters{fmt.Printf("%c-%d ",c,frequency[i])}mutex.Unlock()}// CountLetters// Note: this program us locking the entire goroutine with mutex on purpose to demonstrate// bad placement of the lock and unlock. We fix this in the next listingfuncCountLetters(urlstring,frequency[]int,mutex*sync.Mutex){mutex.Lock()resp,_:=http.Get(url)deferresp.Body.Close()ifresp.StatusCode!=200{panic("Server returning error status code: "+resp.Status)}body,_:=io.ReadAll(resp.Body)for_,b:=rangebody{c:=strings.ToLower(string(b))cIndex:=strings.Index(AllLetters,c)ifcIndex>=0{frequency[cIndex]+=1}}fmt.Println("Completed:",url,time.Now().Format("15:04:05"))mutex.Unlock()}通过这种方式使用互斥锁,我们将原本的并发程序变成了顺序执行的程序。这样一来,我们每次只能下载并处理一个网页,因为整个程序的执行过程被无谓地阻塞了。如果继续运行这个程序,其耗时将与非并发版本的程序相同,只不过各网页的执行顺序会是随机的而已。
下图展示了一种使用三个协程来实现这种锁定机制的简化调度示意图。从图中可以看出,这些协程大部分时间都用于下载文档,而用于处理文档的时间则相对较短。(在图中,我们刻意缩小了下载与处理之间所需时间的比例,以便于说明问题。实际上,两者之间的时间差距要大得多。)总的来说,协程们几乎把所有时间都用在了下载文档上,而用于处理文档的时间则微乎其微。从性能角度来看,没有必要让整个执行过程都被阻塞。因为文档下载过程与其他协程无关,所以也就不存在发生竞争条件的风险。
提示:在决定何时以及如何使用互斥锁时,我们应先明确需要保护哪些资源,并确定各个临界区的起始点和结束点。之后,我们需要想办法尽量减少“Lock()”和“Unlock()”调用的次数。
packagelisting4_5import("fmt""io""net/http""strings""sync""time")constAllLetters="abcdefghijklmnopqrstuvwxyz"funcCountLetters(urlstring,frequency[]int,mutex*sync.Mutex){resp,_:=http.Get(url)deferresp.Body.Close()ifresp.StatusCode!=200{panic("Server returning error code: "+resp.Status)}body,_:=io.ReadAll(resp.Body)mutex.Lock()for_,b:=rangebody{c:=strings.ToLower(string(b))cIndex:=strings.Index(AllLetters,c)ifcIndex>=0{frequency[cIndex]+=1}}mutex.Unlock()fmt.Println("Completed:",url,time.Now().Format("15:04:05"))}在这个代码版本中,下载过程——也就是程序中耗时最长的部分——会以并行方式执行。而快速处理字母计数的任务则会在顺序模式下进行。通过这种方式,我们能够最大限度地提升程序的可扩展性:因为我们只对那些执行速度远远快于其他部分的代码段使用锁机制。
程序的执行过程如图所示。同样,为了便于观察,下载和处理所需的时间比例被夸大了。实际上,处理所需的时间远远少于下载网页所需的时间,因此加速效果更为显著。实际上,在我们的main()函数中,可以将等待时间缩短到几秒钟左右(之前是60秒)。
第二种解决方案比我们的第一次尝试要快得多。当锁定需要处理的代码量较小时,我们完成任务的效率会更高。这里的经验法则是:尽量减少持有互斥锁的时间,同时也要减少互斥锁的调用次数。回想一下阿姆达尔定律就不难理解这一点:如果代码中并行处理部分所占的时间更长,那么整体处理速度就会更快,系统的扩展性也会更好。
非阻塞式互斥锁
当一个协程调用 Lock() 函数时,如果互斥锁已被其他协程占用,那么该协程将会被阻塞。这就是所谓的“阻塞函数”:该协程的执行会停止,直到有另一个协程调用 Unlock() 函数为止。在某些应用中,我们会……
或许不必阻塞该协程,而是可以先执行其他任务,然后再尝试锁定互斥量并访问临界区。
正因如此,Go语言中的互斥锁提供了另一个名为TryLock()的函数。调用这个函数时,我们可能会遇到两种结果之一:
• 锁是可用的,这种情况下我们就能获取到它,此时函数会返回布尔值“true”。
• 该锁当前无法使用,因为另一个协程正在使用这个互斥锁。因此,该函数会立即返回,返回值为布尔值“false”。
非阻塞机制的用途
在Go语言的1.18版本中,新增了用于处理互斥锁的TryLock()函数。不过,关于这种非阻塞调用方式的实用示例其实很少见。这是因为在Go中,创建一个goroutine的成本远远低于在其他语言中创建内核级线程的成本。如果互斥锁不可用,让goroutine做其他事情并没有太大意义。因为在这种情况下,我们完全可以另开一个goroutine来执行任务,而无需等待互斥锁被释放。实际上,Go的互斥锁文档中也提到了这一点(来源:pkg.go.dev/sync#Mutex.TryLock)。
需要注意的是,虽然确实存在正确使用TryLock的情况,但这种情况相当罕见。通常来说,使用TryLock往往意味着在多线程编程中存在更严重的问题。
使用TryLock()的一个例子是:当需要监控某个任务的执行进度时,又不希望干扰该任务的正常进行。如果使用普通的Lock()机制,由于系统中有很多其他goroutine也在试图获取锁,那么就会给互斥锁带来不必要的负担,而这一切都是为了实现监控功能而已。而使用TryLock()的话,如果此时有别的goroutine正在占用互斥锁,那么监控用的goroutine就可以选择稍后再试。这就好比去邮局处理一件不紧急的事务时,看到门口排着长队,就决定改天再来办理吧。
我们可以修改我们的字母频率统计程序,让主goroutine在下载数据和处理文档的过程中,定期检查频率表的内容。
packagemainimport("fmt""io""net/http""strings""sync""time")funcmain(){mutex:=sync.Mutex{}varfrequency=make([]int,26)fori:=2000;i<=2200;i++{url:=fmt.Sprintf("https://rfc-editor.org/rfc/rfc%d.txt",i)goCountLetters(url,frequency,&mutex)}fori:=0;i<100;i++{time.Sleep(100*time.Millisecond)ifmutex.TryLock(){fori,c:=rangeAllLetters{fmt.Printf("%c-%d ",c,frequency[i])}fmt.Println()mutex.Unlock()}else{fmt.Println("Mutex already being used")}}}constAllLetters="abcdefghijklmnopqrstuvwxyz"funcCountLetters(urlstring,frequency[]int,mutex*sync.Mutex){resp,_:=http.Get(url)deferresp.Body.Close()ifresp.StatusCode!=200{panic("Server returning error code: "+resp.Status)}body,_:=io.ReadAll(resp.Body)mutex.Lock()for_,b:=rangebody{c:=strings.ToLower(string(b))cIndex:=strings.Index(AllLetters,c)ifcIndex>=0{frequency[cIndex]+=1}}mutex.Unlock()fmt.Println("Completed:",url,time.Now().Format("15:04:05"))}当我们运行代码时,可以从输出结果中看到:main()协程试图获取锁以打印出频率表。有时它能成功获取锁并完成打印;而有时则无法获取锁,此时它需要等待100毫秒后再尝试一次。
利用读写互斥锁提升性能
有时候,互斥锁的约束过于严格。可以把互斥锁看作是一种“粗暴”的工具:它通过阻止多个线程同时执行来解决问题。在任何时刻,都只能有一个goroutine能够进入受互斥锁保护的代码段。这种方式确实能有效避免竞争条件带来的问题,但对于某些应用来说,它可能会不必要地限制性能和可扩展性。读写互斥锁则是对标准互斥锁的一种改进:只有在我们需要更新共享资源时,它才会阻止其他线程的访问。使用读写互斥锁后,那些以读取操作为主的程序的性能可以得到提升,因为在这种情况下,对共享数据的读取操作远远多于更新操作。
Go语言中的读写互斥锁
如果所有处理客户请求的协程都能以非独占的方式访问该数据结构,那么它们就可以在需要时同时读取数据。这样可以提高性能,因为这样多个仅负责读取共享数据的协程就能同时访问该数据。只有当需要更新数据时,才会对共享数据的访问进行限制。在这个例子中,数据的更新频率很低(每秒几次),而读取数据的频率则非常高(每秒数千次)。因此,采用一种允许多个协程同时读取数据、但只能有单个协程进行写入的机制,会更有利。
这就是读写锁的作用。当我们只需读取共享资源而不对其进行修改时,读写锁允许多个协程同时执行只读操作。而当我们需要修改共享资源时,负责写入操作的协程会请求获得写锁,从而实现对该资源的独占访问。这一原理在上图中有所体现:图的左侧显示,读锁允许多个协程同时进行读取操作,但会阻止任何写入操作;而在图的右侧,一旦获得了写锁,那么所有的读取和写入操作都会被阻止,这与普通互斥锁的机制类似。
总结
• 互斥锁可以用来保护代码中的关键部分,防止它们被同时执行。
• 我们可以通过在关键代码段的开始处调用Lock()函数,在结束处调用unLock()函数来使用互斥锁保护这些关键部分。
• 如果锁定的时间过长,那么原本可以并行执行的代码就会变成顺序执行,从而降低性能。
• 我们可以通过调用 TryLock() 函数来检测某个互斥锁是否已被锁定。
• 读写互斥锁能够提升那些以读取操作为主的应用程序的性能。
• 读写互斥锁允许多个读取者协程同时执行临界代码段,同时确保只有单个写入者协程能够访问该资源。
• 我们可以使用一个计数器以及两个普通的互斥锁来构建一种“以读取操作为主的读写互斥机制”。
