当前位置: 首页 > news >正文

深入了解Python并发编程

并发方式

线程([Thread])

多线程几乎是每一个程序猿在使用每一种语言时都会首先想到用于解决并发的工具(JS程序员请回避),使用多线程可以有效的利用CPU资源(Python例外)。然而多线程所带来的程序的复杂度也不可避免,尤其是对竞争资源的同步问题。

然而在python中由于使用了全局解释锁(GIL)的原因,代码并不能同时在多核上并发的运行,也就是说,Python的多线程不能并发,很多人会发现使用多线程来改进自己的Python代码后,程序的运行效率却下降了,这是多么蛋疼的一件事呀!实际上使用多线程的编程模型是很困难的,程序员很容易犯错,这并不是程序员的错误,因为并行思维是反人类的,我们大多数人的思维是串行(精神分裂不讨论),而且冯诺依曼设计的计算机架构也是以顺序执行为基础的。所以如果你总是不能把你的多线程程序搞定,恭喜你,你是个思维正常的程序猿:)

Python提供两组线程的接口,一组是thread模块,提供基础的,低等级(Low Level)接口,使用Function作为线程的运行体。还有一组是threading模块,提供更容易使用的基于对象的接口(类似于Java),可以继承Thread对象来实现线程,还提供了其它一些线程相关的对象,例如Timer,Lock

使用thread模块的例子

1

2

3

4

5

importthread

defworker():

"""thread worker function"""

print'Worker'

thread.start_new_thread(worker)

使用threading模块的例子

1

2

3

4

5

6

importthreading

defworker():

"""thread worker function"""

print'Worker'

t=threading.Thread(target=worker)

t.start()

或者Java Style

1

2

3

4

5

6

7

8

9

10

importthreading

classworker(threading.Thread):

def__init__(self):

pass

defrun():

"""thread worker function"""

print'Worker'

t=worker()

t.start()

进程 (Process)

由于前文提到的全局解释锁的问题,Python下比较好的并行方式是使用多进程,这样可以非常有效的使用CPU资源,并实现真正意义上的并发。当然,进程的开销比线程要大,也就是说如果你要创建数量惊人的并发进程的话,需要考虑一下你的机器是不是有一颗强大的心。

Python的mutliprocess模块和threading具有类似的接口。

1

2

3

4

5

6

7

8

frommultiprocessingimportProcess

defworker():

"""thread worker function"""

print'Worker'

p=Process(target=worker)

p.start()

p.join()

由于线程共享相同的地址空间和内存,所以线程之间的通信是非常容易的,然而进程之间的通信就要复杂一些了。常见的进程间通信有,管道,消息队列,Socket接口(TCP/IP)等等。

Python的mutliprocess模块提供了封装好的管道和队列,可以方便的在进程间传递消息。

Python进程间的同步使用锁,这一点喝线程是一样的。

另外,Python还提供了进程池Pool对象,可以方便的管理和控制线程。

远程分布式主机 (Distributed Node)

随着大数据时代的到临,摩尔定理在单机上似乎已经失去了效果,数据的计算和处理需要分布式的计算机网络来运行,程序并行的运行在多个主机节点上,已经是现在的软件架构所必需考虑的问题。

远程主机间的进程间通信有几种常见的方式

  • TCP/IP

TCP/IP是所有远程通信的基础,然而API比较低级别,使用起来比较繁琐,所以一般不会考虑

  • 远程方法调用 Remote Function Call

[RPC]

  • 远程对象 Remote Object

远程对象是更高级别的封装,程序可以想操作本地对象一样去操作一个远程对象在本地的代理。远程对象最广为使用的规范CORBA,CORBA最大的好处是可以在不同语言和平台中进行通信。当让不用的语言和平台还有一些各自的远程对象实现,例如Java的RMI,MS的DCOM

Python的开源实现,有许多对远程对象的支持

  • Dopy]
  • Fnorb (CORBA)
  • ICE
  • omniORB (CORBA)
  • Pyro
  • YAMI
  • 消息队列 Message Queue

比起RPC或者远程对象,消息是一种更为灵活的通信手段,常见的支持Python接口的消息机制有

  • RabbitMQ
  • ZeroMQ
  • Kafka
  • AWS SQS + BOTO

在远程主机上执行并发和本地的多进程并没有非常大的差异,都需要解决进程间通信的问题。当然对远程进程的管理和协调比起本地要复杂。

Python下有许多开源的框架来支持分布式的并发,提供有效的管理手段包括:

  • Celery

Celery是一个非常成熟的Python分布式框架,可以在分布式的系统中,异步的执行任务,并提供有效的管理和调度功能。

  • SCOOP

SCOOP (Scalable COncurrent Operations in Python)提供简单易用的分布式调用接口,使用Future接口来进行并发。

  • Dispy

相比起Celery和SCOOP,Dispy提供更为轻量级的分布式并行服务

  • PP

PP (Parallel Python)是另外一个轻量级的Python并行服务

  • Asyncoro

Asyncoro是另一个利用Generator实现分布式并发的Python框架,

当然还有许多其它的系统,我没有一一列出

另外,许多的分布式系统多提供了对Python接口的支持,例如Spark

伪线程 (Pseudo-Thread)

还有一种并发手段并不常见,我们可以称之为伪线程,就是看上去像是线程,使用的接口类似线程接口,但是实际使用非线程的方式,对应的线程开销也不存的。

  • greenlet

greenlet提供轻量级的coroutines来支持进程内的并发。

greenlet是Stackless的一个副产品,使用tasklet来支持一中被称之为微线程(mirco-thread)的技术,这里是一个使用greenlet的伪线程的例子

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

fromgreenletimportgreenlet

deftest1():

print12

gr2.switch()

print34

deftest2():

print56

gr1.switch()

print78

gr1=greenlet(test1)

gr2=greenlet(test2)

gr1.switch()

运行以上程序得到如下结果:

12
56
34

伪线程gr1 switch会打印12,然后调用gr2 switch得到56,然后switch回到gr1,打印34,然后伪线程gr1结束,程序退出,所以78永远不会被打印。通过这个例子我们可以看出,使用伪线程,我们可以有效的控制程序的执行流程,但是伪线程并不存在真正意义上的并发。

eventlet,gevent和concurence都是基于greenlet提供并发的。

  • eventlet

eventlet是一个提供网络调用并发的Python库,使用者可以以非阻塞的方式来调用阻塞的IO操作。

1

2

3

4

5

6

7

8

9

10

11

12

importeventlet

fromeventlet.greenimporturllib2

urls=['http://www.google.com','http://www.example.com','http://www.python.org']

deffetch(url):

returnurllib2.urlopen(url).read()

pool=eventlet.GreenPool()

forbodyinpool.imap(fetch, urls):

print("got body",len(body))

执行结果如下

('got body', 17629)
('got body', 1270)
('got body', 46949)

eventlet为了支持generator的操作对urllib2做了修改,接口和urllib2是一致的。这里的GreenPool和Python的Pool接口一致。

  • gevent

gevent和eventlet类似,

1

2

3

4

5

6

7

importgevent

fromgeventimportsocket

urls=['www.google.com','www.example.com','www.python.org']

jobs=[gevent.spawn(socket.gethostbyname, url)forurlinurls]

gevent.joinall(jobs, timeout=2)

print[job.valueforjobinjobs]

执行结果如下:

['206.169.145.226', '93.184.216.34', '23.235.39.223']

  • concurence

concurence是另外一个利用greenlet提供网络并发的开源库,我没有用过,大家可以自己尝试一下。

实战运用

通常需要用到并发的场合有两种,一种是计算密集型,也就是说你的程序需要大量的CPU资源;另一种是IO密集型,程序可能有大量的读写操作,包括读写文件,收发网络请求等等。

计算密集型

对应计算密集型的应用,我们选用著名的蒙特卡洛算法来计算PI值。基本原理如下

蒙特卡洛算法利用统计学原理来模拟计算圆周率,在一个正方形中,一个随机的点落在1/4圆的区域(红色点)的概率与其面积成正比。也就该概率 p = Pi * R*R /4 : R* R , 其中R是正方形的边长,圆的半径。也就是说该概率是圆周率的1/4, 利用这个结论,只要我们模拟出点落在四分之一圆上的概率就可以知道圆周率了,为了得到这个概率,我们可以通过大量的实验,也就是生成大量的点,看看这个点在哪个区域,然后统计出结果。

基本算法如下:

1

2

3

4

5

frommathimporthypot

fromrandomimportrandom

deftest(tries):

returnsum(hypot(random(), random()) <1for_inrange(tries))

这里test方法做了n(tries)次试验,返回落在四分之一圆中的点的个数。判断方法是检查该点到圆心的距离,如果小于R则是在圆上。

通过大量的并发,我们可以快速的运行多次试验,试验的次数越多,结果越接近真实的圆周率。

这里给出不同并发方法的程序代码

  • 非并发

我们先在单线程,但进程运行,看看性能如何

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

frommathimporthypot

fromrandomimportrandom

importeventlet

importtime

deftest(tries):

returnsum(hypot(random(), random()) <1for_inrange(tries))

defcalcPi(nbFutures, tries):

ts=time.time()

result=map(test, [tries]*nbFutures)

ret=4.*sum(result)/float(nbFutures*tries)

span=time.time()-ts

print"time spend ", span

returnret

printcalcPi(3000,4000)

  • 多线程 thread

为了使用线程池,我们用multiprocessing的dummy包,它是对多线程的一个封装。注意这里代码虽然一个字的没有提到线程,但它千真万确是多线程。

通过测试我们开(jing)心(ya)的发现,果然不出所料,当线程池为1是,它的运行结果和没有并发时一样,当我们把线程池数字设置为5时,耗时几乎是没有并发的2倍,我的测试数据从5秒到9秒。所以对于计算密集型的任务,还是放弃多线程吧。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

frommultiprocessing.dummyimportPool

frommathimporthypot

fromrandomimportrandom

importtime

deftest(tries):

returnsum(hypot(random(), random()) <1for_inrange(tries))

defcalcPi(nbFutures, tries):

ts=time.time()

p=Pool(1)

result=p.map(test, [tries]*nbFutures)

ret=4.*sum(result)/float(nbFutures*tries)

span=time.time()-ts

print"time spend ", span

returnret

if__name__=='__main__':

p=Pool()

print("pi = {}".format(calcPi(3000,4000)))

  • 多进程 multiprocess

理论上对于计算密集型的任务,使用多进程并发比较合适,在以下的例子中,进程池的规模设置为5,修改进程池的大小可以看到对结果的影响,当进程池设置为1时,和多线程的结果所需的时间类似,因为这时候并不存在并发;当设置为2时,响应时间有了明显的改进,是之前没有并发的一半;然而继续扩大进程池对性能影响并不大,甚至有所下降,也许我的Apple Air的CPU只有两个核?


http://www.jsqmd.com/news/792191/

相关文章:

  • 如何通过Noto Emoji实现跨平台表情符号统一:技术原理与应用实践
  • Qt/C++实战:手把手教你用QCustomPlot实现动态刷新热力图(模拟实时数据)
  • MySQL高级特性:索引优化详解
  • 2026年4月优质的初中效袋式过滤器批发厂家推荐,防潮设计适应潮湿环境 - 品牌推荐师
  • Redis数据结构与性能优化详解
  • 使用本地浏览器打开远程服务器生成的网页——详细教程
  • 打破语言壁垒:Translumo屏幕实时翻译工具的终极使用指南
  • 2026 年 Q1 全球互联网中断报告:断网、停电与战争
  • 20253221 2025-2026-2 《Python程序设计》实验3报告
  • Python函数中的全局变量详解
  • 量子计算机来了,你的企业网络隧道还安全吗?
  • PostgreSQL高级特性详解
  • Redis学习8 Redis数据结构(1)
  • 基于Vue.js与AI对话的智能思维导图生成器开发实践
  • 终极解决方案:如何快速批量转换GBK到UTF-8编码文件
  • 一次例行密钥轮换,让数百万德国域名集体蒸发
  • 独立开发者工具箱:2026年全栈与AI应用高效开发技术栈指南
  • MongoDB聚合与查询优化详解
  • 如何在 Docker 容器中部署企业微信机器人服务保证高可用
  • 31_AI短片实战第四弹:主观视角空间控制与分屏快速剪辑的AI生成策略(附提示词)
  • 高管求职渠道公司实测:4家机构核心能力对比评测 - 得赢
  • 两次全球宕机之后,Cloudflare 用半年时间重建了什么
  • 2026届最火的AI写作平台推荐榜单
  • Logseq AI助手插件:在知识管理笔记中集成ChatGPT智能写作与编辑
  • hls::stream<ap_uint<DW * NPPC>> src,报错原因分析
  • 32_AI短片实战第五弹:飞跃峡谷——高潮镜头的“放手”哲学与首帧脑补策略(附提示词)
  • DeepSeek V4 横向对比真实表现
  • 终极指南:如何用NPYViewer快速查看和可视化NumPy数组数据
  • YOLO11进阶技巧:数据增强策略 | 舍弃传统Mosaic,引入Copy-Paste与MixUp混合数据增强,有效缓解过拟合
  • R7000P梅林固件进阶玩法:解锁软件中心、挂载U盘与插件安装全攻略